Support authentication renewal in federate mode
This commit is contained in:
parent
ed3e382ef1
commit
c6583c925e
3 changed files with 100 additions and 6 deletions
|
@ -42,13 +42,13 @@ class CASFederateValidateUser(object):
|
||||||
#: the identity provider
|
#: the identity provider
|
||||||
provider = None
|
provider = None
|
||||||
|
|
||||||
def __init__(self, provider, service_url):
|
def __init__(self, provider, service_url, renew=False):
|
||||||
self.provider = provider
|
self.provider = provider
|
||||||
self.client = CASClient(
|
self.client = CASClient(
|
||||||
service_url=service_url,
|
service_url=service_url,
|
||||||
version=provider.cas_protocol_version,
|
version=provider.cas_protocol_version,
|
||||||
server_url=provider.server_url,
|
server_url=provider.server_url,
|
||||||
renew=False,
|
renew=renew,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_login_url(self):
|
def get_login_url(self):
|
||||||
|
|
|
@ -84,6 +84,10 @@ class FederateAuthLoginLogoutTestCase(
|
||||||
params['provider'] = provider.suffix
|
params['provider'] = provider.suffix
|
||||||
if remember:
|
if remember:
|
||||||
params['remember'] = 'on'
|
params['remember'] = 'on'
|
||||||
|
# just try for one suffix
|
||||||
|
if suffix == "example.com":
|
||||||
|
# if renew=False is posted it should be ignored
|
||||||
|
params["renew"] = False
|
||||||
# post the choosed provider
|
# post the choosed provider
|
||||||
response = client.post('/federate', params)
|
response = client.post('/federate', params)
|
||||||
# we are redirected to the provider CAS client url
|
# we are redirected to the provider CAS client url
|
||||||
|
@ -351,6 +355,76 @@ class FederateAuthLoginLogoutTestCase(
|
||||||
provider.suffix
|
provider.suffix
|
||||||
))
|
))
|
||||||
|
|
||||||
|
def test_forget_provider(self):
|
||||||
|
"""Test the logout option to forget remembered provider"""
|
||||||
|
tickets = self.test_login_post_provider(remember=True)
|
||||||
|
for (provider, _, client) in tickets:
|
||||||
|
self.assertIn("remember_provider", client.cookies)
|
||||||
|
self.assertEqual(client.cookies["remember_provider"].value, provider.suffix)
|
||||||
|
self.assertNotEqual(client.cookies["remember_provider"]["max-age"], 0)
|
||||||
|
client.get("/logout?forget_provider=1")
|
||||||
|
self.assertEqual(client.cookies["remember_provider"]["max-age"], 0)
|
||||||
|
|
||||||
|
def test_renew(self):
|
||||||
|
"""
|
||||||
|
Test authentication renewal with federation mode
|
||||||
|
"""
|
||||||
|
tickets = self.test_login_post_provider()
|
||||||
|
for (provider, _, client) in tickets:
|
||||||
|
# Try to renew authentication(client already authenticated in test_login_post_provider
|
||||||
|
response = client.get("/login?renew=true")
|
||||||
|
# we should be redirected to the user CAS
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
self.assertEqual(response["Location"], "%s/federate/%s?renew=true" % (
|
||||||
|
'http://testserver' if django.VERSION < (1, 9) else "",
|
||||||
|
provider.suffix
|
||||||
|
))
|
||||||
|
|
||||||
|
response = client.get("/federate/%s?renew=true" % provider.suffix)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
service_url = (
|
||||||
|
"service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s%%3Frenew%%3Dtrue"
|
||||||
|
) % provider.suffix
|
||||||
|
self.assertIn(service_url, response["Location"])
|
||||||
|
self.assertIn("renew=true", response["Location"])
|
||||||
|
|
||||||
|
cas_port = int(provider.server_url.split(':')[-1])
|
||||||
|
# let's generate a ticket
|
||||||
|
ticket = utils.gen_st()
|
||||||
|
# we lauch a dummy CAS server that only validate once for the service
|
||||||
|
# http://testserver/federate/example.com?renew=true with `ticket`
|
||||||
|
tests_utils.DummyCAS.run(
|
||||||
|
("http://testserver/federate/%s?renew=true" % provider.suffix).encode("ascii"),
|
||||||
|
ticket.encode("ascii"),
|
||||||
|
settings.CAS_TEST_USER.encode("utf8"),
|
||||||
|
[],
|
||||||
|
cas_port
|
||||||
|
)
|
||||||
|
# we normally provide a good ticket and should be redirected to /login as the ticket
|
||||||
|
# get successfully validated again the dummy CAS
|
||||||
|
response = client.get(
|
||||||
|
'/federate/%s' % provider.suffix,
|
||||||
|
{'ticket': ticket, 'renew': 'true'}
|
||||||
|
)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
self.assertEqual(response["Location"], "%s/login?renew=true" % (
|
||||||
|
'http://testserver' if django.VERSION < (1, 9) else ""
|
||||||
|
))
|
||||||
|
# follow the redirect and try to get a ticket to see is it has renew set to True
|
||||||
|
response = client.get("/login?renew=true&service=%s" % self.service)
|
||||||
|
# we should get a page with a from with all widget hidden that auto POST to /login using
|
||||||
|
# javascript. If javascript is disabled, a "connect" button is showed
|
||||||
|
self.assertTrue(response.context['auto_submit'])
|
||||||
|
self.assertEqual(response.context['post_url'], '/login')
|
||||||
|
params = tests_utils.copy_form(response.context["form"])
|
||||||
|
# POST get prefiled from parameters
|
||||||
|
response = client.post("/login", params)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
self.assertTrue(response["Location"].startswith("%s?ticket=" % self.service))
|
||||||
|
ticket_value = response["Location"].split('ticket=')[-1]
|
||||||
|
ticket = models.ServiceTicket.objects.get(value=ticket_value)
|
||||||
|
self.assertTrue(ticket.renew)
|
||||||
|
|
||||||
def test_login_bad_ticket(self):
|
def test_login_bad_ticket(self):
|
||||||
"""
|
"""
|
||||||
Try login with a bad ticket:
|
Try login with a bad ticket:
|
||||||
|
|
|
@ -212,6 +212,7 @@ class LogoutView(View, LogoutMixin):
|
||||||
|
|
||||||
class FederateAuth(View):
|
class FederateAuth(View):
|
||||||
"""view to authenticated user agains a backend CAS then CAS_FEDERATE is True"""
|
"""view to authenticated user agains a backend CAS then CAS_FEDERATE is True"""
|
||||||
|
|
||||||
@method_decorator(csrf_exempt) # csrf is disabled for allowing SLO requests reception
|
@method_decorator(csrf_exempt) # csrf is disabled for allowing SLO requests reception
|
||||||
def dispatch(self, request, *args, **kwargs):
|
def dispatch(self, request, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
|
@ -221,7 +222,7 @@ class FederateAuth(View):
|
||||||
"""
|
"""
|
||||||
return super(FederateAuth, self).dispatch(request, *args, **kwargs)
|
return super(FederateAuth, self).dispatch(request, *args, **kwargs)
|
||||||
|
|
||||||
def get_cas_client(self, request, provider):
|
def get_cas_client(self, request, provider, renew=False):
|
||||||
"""
|
"""
|
||||||
return a CAS client object matching provider
|
return a CAS client object matching provider
|
||||||
|
|
||||||
|
@ -234,7 +235,7 @@ class FederateAuth(View):
|
||||||
# compute the current url, ignoring ticket dans provider GET parameters
|
# compute the current url, ignoring ticket dans provider GET parameters
|
||||||
service_url = utils.get_current_url(request, {"ticket", "provider"})
|
service_url = utils.get_current_url(request, {"ticket", "provider"})
|
||||||
self.service_url = service_url
|
self.service_url = service_url
|
||||||
return CASFederateValidateUser(provider, service_url)
|
return CASFederateValidateUser(provider, service_url, renew=renew)
|
||||||
|
|
||||||
def post(self, request, provider=None):
|
def post(self, request, provider=None):
|
||||||
"""
|
"""
|
||||||
|
@ -291,16 +292,17 @@ class FederateAuth(View):
|
||||||
if not settings.CAS_FEDERATE:
|
if not settings.CAS_FEDERATE:
|
||||||
logger.warning("CAS_FEDERATE is False, set it to True to use the federated mode")
|
logger.warning("CAS_FEDERATE is False, set it to True to use the federated mode")
|
||||||
return redirect("cas_server:login")
|
return redirect("cas_server:login")
|
||||||
|
renew = bool(request.GET.get('renew') and request.GET['renew'] != "False")
|
||||||
# Is the user is already authenticated, no need to request authentication to the user
|
# Is the user is already authenticated, no need to request authentication to the user
|
||||||
# identity provider.
|
# identity provider.
|
||||||
if self.request.session.get("authenticated"):
|
if self.request.session.get("authenticated") and not renew:
|
||||||
logger.warning("User already authenticated, dropping federate authentication request")
|
logger.warning("User already authenticated, dropping federate authentication request")
|
||||||
return redirect("cas_server:login")
|
return redirect("cas_server:login")
|
||||||
try:
|
try:
|
||||||
# get the identity provider from its suffix
|
# get the identity provider from its suffix
|
||||||
provider = FederatedIendityProvider.objects.get(suffix=provider)
|
provider = FederatedIendityProvider.objects.get(suffix=provider)
|
||||||
# get a CAS client for the user identity provider
|
# get a CAS client for the user identity provider
|
||||||
auth = self.get_cas_client(request, provider)
|
auth = self.get_cas_client(request, provider, renew)
|
||||||
# if no ticket submited, redirect to the identity provider CAS login page
|
# if no ticket submited, redirect to the identity provider CAS login page
|
||||||
if 'ticket' not in request.GET:
|
if 'ticket' not in request.GET:
|
||||||
logger.info("Trying to authenticate again %s" % auth.provider.server_url)
|
logger.info("Trying to authenticate again %s" % auth.provider.server_url)
|
||||||
|
@ -871,6 +873,24 @@ class LoginView(View, LogoutMixin):
|
||||||
)
|
)
|
||||||
return HttpResponseRedirect(url)
|
return HttpResponseRedirect(url)
|
||||||
else:
|
else:
|
||||||
|
# if user is authenticated and auth renewal is requested, redirect directly
|
||||||
|
# to the user identity provider
|
||||||
|
if self.renew and self.request.session.get("authenticated"):
|
||||||
|
try:
|
||||||
|
user = FederatedUser.get_from_federated_username(
|
||||||
|
self.request.session.get("username")
|
||||||
|
)
|
||||||
|
params = utils.copy_params(self.request.GET)
|
||||||
|
url = utils.reverse_params(
|
||||||
|
"cas_server:federateAuth",
|
||||||
|
params=params,
|
||||||
|
kwargs=dict(provider=user.provider.suffix)
|
||||||
|
)
|
||||||
|
return HttpResponseRedirect(url)
|
||||||
|
# Should normally not happen: if the user is logged, it exists in the
|
||||||
|
# database.
|
||||||
|
except FederatedUser.DoesNotExist: # pragma: no cover
|
||||||
|
pass
|
||||||
return render(
|
return render(
|
||||||
self.request,
|
self.request,
|
||||||
settings.CAS_LOGIN_TEMPLATE,
|
settings.CAS_LOGIN_TEMPLATE,
|
||||||
|
|
Loading…
Reference in a new issue