diff --git a/cas_server/federate.py b/cas_server/federate.py index 2cfd90e..d977771 100644 --- a/cas_server/federate.py +++ b/cas_server/federate.py @@ -42,13 +42,13 @@ class CASFederateValidateUser(object): #: the identity provider provider = None - def __init__(self, provider, service_url): + def __init__(self, provider, service_url, renew=False): self.provider = provider self.client = CASClient( service_url=service_url, version=provider.cas_protocol_version, server_url=provider.server_url, - renew=False, + renew=renew, ) def get_login_url(self): diff --git a/cas_server/tests/test_federate.py b/cas_server/tests/test_federate.py index b6fa3f9..cfcc5b7 100644 --- a/cas_server/tests/test_federate.py +++ b/cas_server/tests/test_federate.py @@ -84,6 +84,10 @@ class FederateAuthLoginLogoutTestCase( params['provider'] = provider.suffix if remember: params['remember'] = 'on' + # just try for one suffix + if suffix == "example.com": + # if renew=False is posted it should be ignored + params["renew"] = False # post the choosed provider response = client.post('/federate', params) # we are redirected to the provider CAS client url @@ -351,6 +355,76 @@ class FederateAuthLoginLogoutTestCase( provider.suffix )) + def test_forget_provider(self): + """Test the logout option to forget remembered provider""" + tickets = self.test_login_post_provider(remember=True) + for (provider, _, client) in tickets: + self.assertIn("remember_provider", client.cookies) + self.assertEqual(client.cookies["remember_provider"].value, provider.suffix) + self.assertNotEqual(client.cookies["remember_provider"]["max-age"], 0) + client.get("/logout?forget_provider=1") + self.assertEqual(client.cookies["remember_provider"]["max-age"], 0) + + def test_renew(self): + """ + Test authentication renewal with federation mode + """ + tickets = self.test_login_post_provider() + for (provider, _, client) in tickets: + # Try to renew authentication(client already authenticated in test_login_post_provider + response = client.get("/login?renew=true") + # we should be redirected to the user CAS + self.assertEqual(response.status_code, 302) + self.assertEqual(response["Location"], "%s/federate/%s?renew=true" % ( + 'http://testserver' if django.VERSION < (1, 9) else "", + provider.suffix + )) + + response = client.get("/federate/%s?renew=true" % provider.suffix) + self.assertEqual(response.status_code, 302) + service_url = ( + "service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s%%3Frenew%%3Dtrue" + ) % provider.suffix + self.assertIn(service_url, response["Location"]) + self.assertIn("renew=true", response["Location"]) + + cas_port = int(provider.server_url.split(':')[-1]) + # let's generate a ticket + ticket = utils.gen_st() + # we lauch a dummy CAS server that only validate once for the service + # http://testserver/federate/example.com?renew=true with `ticket` + tests_utils.DummyCAS.run( + ("http://testserver/federate/%s?renew=true" % provider.suffix).encode("ascii"), + ticket.encode("ascii"), + settings.CAS_TEST_USER.encode("utf8"), + [], + cas_port + ) + # we normally provide a good ticket and should be redirected to /login as the ticket + # get successfully validated again the dummy CAS + response = client.get( + '/federate/%s' % provider.suffix, + {'ticket': ticket, 'renew': 'true'} + ) + self.assertEqual(response.status_code, 302) + self.assertEqual(response["Location"], "%s/login?renew=true" % ( + 'http://testserver' if django.VERSION < (1, 9) else "" + )) + # follow the redirect and try to get a ticket to see is it has renew set to True + response = client.get("/login?renew=true&service=%s" % self.service) + # we should get a page with a from with all widget hidden that auto POST to /login using + # javascript. If javascript is disabled, a "connect" button is showed + self.assertTrue(response.context['auto_submit']) + self.assertEqual(response.context['post_url'], '/login') + params = tests_utils.copy_form(response.context["form"]) + # POST get prefiled from parameters + response = client.post("/login", params) + self.assertEqual(response.status_code, 302) + self.assertTrue(response["Location"].startswith("%s?ticket=" % self.service)) + ticket_value = response["Location"].split('ticket=')[-1] + ticket = models.ServiceTicket.objects.get(value=ticket_value) + self.assertTrue(ticket.renew) + def test_login_bad_ticket(self): """ Try login with a bad ticket: diff --git a/cas_server/views.py b/cas_server/views.py index 04de6d3..e810f54 100644 --- a/cas_server/views.py +++ b/cas_server/views.py @@ -212,6 +212,7 @@ class LogoutView(View, LogoutMixin): class FederateAuth(View): """view to authenticated user agains a backend CAS then CAS_FEDERATE is True""" + @method_decorator(csrf_exempt) # csrf is disabled for allowing SLO requests reception def dispatch(self, request, *args, **kwargs): """ @@ -221,7 +222,7 @@ class FederateAuth(View): """ return super(FederateAuth, self).dispatch(request, *args, **kwargs) - def get_cas_client(self, request, provider): + def get_cas_client(self, request, provider, renew=False): """ return a CAS client object matching provider @@ -234,7 +235,7 @@ class FederateAuth(View): # compute the current url, ignoring ticket dans provider GET parameters service_url = utils.get_current_url(request, {"ticket", "provider"}) self.service_url = service_url - return CASFederateValidateUser(provider, service_url) + return CASFederateValidateUser(provider, service_url, renew=renew) def post(self, request, provider=None): """ @@ -291,16 +292,17 @@ class FederateAuth(View): if not settings.CAS_FEDERATE: logger.warning("CAS_FEDERATE is False, set it to True to use the federated mode") return redirect("cas_server:login") + renew = bool(request.GET.get('renew') and request.GET['renew'] != "False") # Is the user is already authenticated, no need to request authentication to the user # identity provider. - if self.request.session.get("authenticated"): + if self.request.session.get("authenticated") and not renew: logger.warning("User already authenticated, dropping federate authentication request") return redirect("cas_server:login") try: # get the identity provider from its suffix provider = FederatedIendityProvider.objects.get(suffix=provider) # get a CAS client for the user identity provider - auth = self.get_cas_client(request, provider) + auth = self.get_cas_client(request, provider, renew) # if no ticket submited, redirect to the identity provider CAS login page if 'ticket' not in request.GET: logger.info("Trying to authenticate again %s" % auth.provider.server_url) @@ -871,6 +873,24 @@ class LoginView(View, LogoutMixin): ) return HttpResponseRedirect(url) else: + # if user is authenticated and auth renewal is requested, redirect directly + # to the user identity provider + if self.renew and self.request.session.get("authenticated"): + try: + user = FederatedUser.get_from_federated_username( + self.request.session.get("username") + ) + params = utils.copy_params(self.request.GET) + url = utils.reverse_params( + "cas_server:federateAuth", + params=params, + kwargs=dict(provider=user.provider.suffix) + ) + return HttpResponseRedirect(url) + # Should normally not happen: if the user is logged, it exists in the + # database. + except FederatedUser.DoesNotExist: # pragma: no cover + pass return render( self.request, settings.CAS_LOGIN_TEMPLATE,