Redirect URIs must match exactly. (#191)

* Test redirect_uri construction

This was a test marked as TODO.

* Remove duplicate test

* Add tests to exactly match redirect URIs

* Redirect URIs must match exactly.

To quote from the specification at
http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest:

Redirection URI to which the response will be sent. This URI MUST
exactly match one of the Redirection URI values for the Client
pre-registered at the OpenID Provider, with the matching performed as
described in Section 6.2.1 of [RFC3986] (Simple String Comparison).
This commit is contained in:
Jan Brauer 2017-07-07 09:07:21 +02:00 committed by Wojciech Bartosiak
parent f07327a713
commit 1215c27d7e
5 changed files with 70 additions and 56 deletions

View file

@ -30,7 +30,7 @@ from oidc_provider.models import (
UserConsent, UserConsent,
) )
from oidc_provider import settings from oidc_provider import settings
from oidc_provider.lib.utils.common import cleanup_url_from_query_string, get_browser_state_or_default from oidc_provider.lib.utils.common import get_browser_state_or_default
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -93,8 +93,7 @@ class AuthorizeEndpoint(object):
if self.is_authentication and not self.params['redirect_uri']: if self.is_authentication and not self.params['redirect_uri']:
logger.debug('[Authorize] Missing redirect uri.') logger.debug('[Authorize] Missing redirect uri.')
raise RedirectUriError() raise RedirectUriError()
clean_redirect_uri = cleanup_url_from_query_string(self.params['redirect_uri']) if not (self.params['redirect_uri'] in self.client.redirect_uris):
if not (clean_redirect_uri in self.client.redirect_uris):
logger.debug('[Authorize] Invalid redirect uri: %s', self.params['redirect_uri']) logger.debug('[Authorize] Invalid redirect uri: %s', self.params['redirect_uri'])
raise RedirectUriError() raise RedirectUriError()

View file

@ -3,7 +3,6 @@ import hashlib
import logging import logging
import re import re
from django.contrib.auth import authenticate from django.contrib.auth import authenticate
from oidc_provider.lib.utils.common import cleanup_url_from_query_string
try: try:
from urllib.parse import unquote from urllib.parse import unquote
@ -43,8 +42,7 @@ class TokenEndpoint(object):
self.params['client_id'] = client_id self.params['client_id'] = client_id
self.params['client_secret'] = client_secret self.params['client_secret'] = client_secret
self.params['redirect_uri'] = unquote( self.params['redirect_uri'] = self.request.POST.get('redirect_uri', '')
self.request.POST.get('redirect_uri', '').split('?', 1)[0])
self.params['grant_type'] = self.request.POST.get('grant_type', '') self.params['grant_type'] = self.request.POST.get('grant_type', '')
self.params['code'] = self.request.POST.get('code', '') self.params['code'] = self.request.POST.get('code', '')
self.params['state'] = self.request.POST.get('state', '') self.params['state'] = self.request.POST.get('state', '')
@ -93,8 +91,7 @@ class TokenEndpoint(object):
raise TokenError('invalid_client') raise TokenError('invalid_client')
if self.params['grant_type'] == 'authorization_code': if self.params['grant_type'] == 'authorization_code':
clean_redirect_uri = cleanup_url_from_query_string(self.params['redirect_uri']) if not (self.params['redirect_uri'] in self.client.redirect_uris):
if not (clean_redirect_uri in self.client.redirect_uris):
logger.debug('[Token] Invalid redirect uri: %s', self.params['redirect_uri']) logger.debug('[Token] Invalid redirect uri: %s', self.params['redirect_uri'])
raise TokenError('invalid_client') raise TokenError('invalid_client')

View file

@ -11,19 +11,6 @@ except ImportError:
from urllib.parse import urlsplit, urlunsplit from urllib.parse import urlsplit, urlunsplit
def cleanup_url_from_query_string(uri):
"""
Function used to clean up the uri from any query string, used i.e. by endpoints to validate redirect_uri
:param uri: URI to clean from query string
:type uri: str
:return: cleaned URI without query string
"""
clean_uri = urlsplit(uri)
clean_uri = urlunsplit(clean_uri._replace(query=''))
return clean_uri
def redirect(uri): def redirect(uri):
""" """
Custom Response object for redirecting to a Non-HTTP url scheme. Custom Response object for redirecting to a Non-HTTP url scheme.

View file

@ -1,3 +1,5 @@
from oidc_provider.lib.errors import RedirectUriError
try: try:
from urllib.parse import urlencode from urllib.parse import urlencode
except ImportError: except ImportError:
@ -249,9 +251,13 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
self.assertEqual(is_code_ok, True, msg='Code returned is invalid or missing.') self.assertEqual(is_code_ok, True, msg='Code returned is invalid or missing.')
def test_response_uri_is_properly_constructed(self): def test_response_uri_is_properly_constructed(self):
"""
Check that the redirect_uri matches the one configured for the client.
Only 'state' and 'code' should be appended.
"""
data = { data = {
'client_id': self.client.client_id, 'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri + "?redirect_state=xyz", 'redirect_uri': self.client.default_redirect_uri,
'response_type': 'code', 'response_type': 'code',
'scope': 'openid email', 'scope': 'openid email',
'state': self.state, 'state': self.state,
@ -260,7 +266,51 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
response = self._auth_request('post', data, is_user_authenticated=True) response = self._auth_request('post', data, is_user_authenticated=True)
# TODO parsed = urlsplit(response['Location'])
params = parse_qs(parsed.query or parsed.fragment)
state = params['state'][0]
self.assertEquals(self.state, state, msg="State returned is invalid or missing")
is_code_ok = is_code_valid(url=response['Location'],
user=self.user,
client=self.client)
self.assertTrue(is_code_ok, msg='Code returned is invalid or missing')
self.assertEquals(set(params.keys()), set(['state', 'code']), msg='More than state or code appended as query params')
self.assertTrue(response['Location'].startswith(self.client.default_redirect_uri), msg='Different redirect_uri returned')
def test_unknown_redirect_uris_are_rejected(self):
"""
If a redirect_uri is not registered with the client the request must be rejected.
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest.
"""
data = {
'client_id': self.client.client_id,
'response_type': 'code',
'redirect_uri': 'http://neverseenthis.com',
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data)
self.assertIn(RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error')
def test_manipulated_redirect_uris_are_rejected(self):
"""
If a redirect_uri does not exactly match the registered uri it must be rejected.
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest.
"""
data = {
'client_id': self.client.client_id,
'response_type': 'code',
'redirect_uri': self.client.default_redirect_uri + "?some=query",
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data)
self.assertIn(RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error')
def test_public_client_auto_approval(self): def test_public_client_auto_approval(self):
""" """

View file

@ -339,12 +339,12 @@ class TokenTestCase(TestCase):
response = self._post_request(post_data) response = self._post_request(post_data)
self.assertIn('invalid_grant', response.content.decode('utf-8')) self.assertIn('invalid_grant', response.content.decode('utf-8'))
def test_client_redirect_url(self): def test_client_redirect_uri(self):
""" """
Validate that client redirect URIs with query strings match registered Validate that client redirect URIs exactly match registered
URIs, and that unregistered URIs are rejected. URIs, and that unregistered URIs or URIs with query parameters are rejected.
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest and
source: https://github.com/jerrykan/django-oidc-provider/blob/2f54e537666c689dd8448f8bbc6a3a0244b01a97/oidc_provider/tests/test_token_endpoint.py http://openid.net/specs/openid-connect-core-1_0.html#HybridTokenRequest.
""" """
SIGKEYS = self._get_keys() SIGKEYS = self._get_keys()
code = self._create_code() code = self._create_code()
@ -354,15 +354,19 @@ class TokenTestCase(TestCase):
post_data['redirect_uri'] = 'http://invalid.example.org' post_data['redirect_uri'] = 'http://invalid.example.org'
response = self._post_request(post_data) response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8'))
self.assertIn('invalid_client', response.content.decode('utf-8')), # Registered URI, but with query string appended
post_data['redirect_uri'] = self.client.default_redirect_uri + '?foo=bar'
# Registered URI contained a query string
post_data['redirect_uri'] = 'http://example.com/?client=OidcClient'
response = self._post_request(post_data) response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8'))
self.assertNotIn('invalid_client', response.content.decode('utf-8')), # Registered URI
post_data['redirect_uri'] = self.client.default_redirect_uri
response = self._post_request(post_data)
self.assertNotIn('invalid_client', response.content.decode('utf-8'))
def test_request_methods(self): def test_request_methods(self):
""" """
@ -440,29 +444,6 @@ class TokenTestCase(TestCase):
False, False,
msg='Client authentication fails using HTTP Basic Auth.') msg='Client authentication fails using HTTP Basic Auth.')
def test_client_redirect_url(self):
"""
Validate that client redirect URIs with query strings match registered
URIs, and that unregistered URIs are rejected.
"""
SIGKEYS = self._get_keys()
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
# Unregistered URI
post_data['redirect_uri'] = 'http://invalid.example.org'
response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8')),
# Registered URI contained a query string
post_data['redirect_uri'] = 'http://example.com/?client=OidcClient'
response = self._post_request(post_data)
self.assertNotIn('invalid_client', response.content.decode('utf-8')),
def test_access_token_contains_nonce(self): def test_access_token_contains_nonce(self):
""" """
If present in the Authentication Request, Authorization Servers MUST If present in the Authentication Request, Authorization Servers MUST