From 0776e371e8df90f19338650c6e5277bae8af0da7 Mon Sep 17 00:00:00 2001 From: Valentin Samir Date: Fri, 24 Jun 2016 21:23:33 +0200 Subject: [PATCH] style --- cas_server/default_settings.py | 5 +- cas_server/tests.py | 277 ++++++++++++++++++++++++++------- cas_server/utils.py | 9 +- 3 files changed, 229 insertions(+), 62 deletions(-) diff --git a/cas_server/default_settings.py b/cas_server/default_settings.py index 2c421d7..2824991 100644 --- a/cas_server/default_settings.py +++ b/cas_server/default_settings.py @@ -76,4 +76,7 @@ setting_default('CAS_SQL_PASSWORD_CHECK', 'crypt') # crypt or plain setting_default('CAS_TEST_USER', 'test') setting_default('CAS_TEST_PASSWORD', 'test') -setting_default('CAS_TEST_ATTRIBUTES', {'nom': 'Nymous', 'prenom': 'Ano', 'email': 'anonymous@example.net'}) +setting_default( + 'CAS_TEST_ATTRIBUTES', + {'nom': 'Nymous', 'prenom': 'Ano', 'email': 'anonymous@example.net'} +) diff --git a/cas_server/tests.py b/cas_server/tests.py index 75683e6..b989ee6 100644 --- a/cas_server/tests.py +++ b/cas_server/tests.py @@ -4,11 +4,11 @@ from django.test import TestCase from django.test import Client from lxml import etree -import BaseHTTPServer import models import utils + def get_login_page_params(): client = Client() response = client.get('/login') @@ -21,24 +21,28 @@ def get_login_page_params(): params[field.name] = "" return client, params + def get_auth_client(): client, params = get_login_page_params() params["username"] = settings.CAS_TEST_USER params["password"] = settings.CAS_TEST_PASSWORD - response = client.post('/login', params) + client.post('/login', params) return client + def get_user_ticket_request(service): client = get_auth_client() response = client.get("/login", {"service": service}) ticket_value = response['Location'].split('ticket=')[-1] - user = models.User.objects.get(username=settings.CAS_TEST_USER, session_key=client.session.session_key) + user = models.User.objects.get( + username=settings.CAS_TEST_USER, + session_key=client.session.session_key + ) ticket = models.ServiceTicket.objects.get(value=ticket_value) return (user, ticket) - def get_pgt(): (httpd_thread, host, port) = utils.PGTUrlHandler.run() service = "http://%s:%s" % (host, port) @@ -46,7 +50,7 @@ def get_pgt(): (user, ticket) = get_user_ticket_request(service) client = Client() - response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': service, 'pgtUrl': service}) + client.get('/serviceValidate', {'ticket': ticket.value, 'service': service, 'pgtUrl': service}) params = utils.PGTUrlHandler.PARAMS.copy() params["service"] = service @@ -54,6 +58,7 @@ def get_pgt(): return params + class LoginTestCase(TestCase): def setUp(self): @@ -72,10 +77,19 @@ class LoginTestCase(TestCase): response = client.post('/login', params) self.assertEqual(response.status_code, 200) - self.assertTrue("You have successfully logged into the Central Authentication Service" in response.content) - - self.assertTrue(models.User.objects.get(username=settings.CAS_TEST_USER, session_key=client.session.session_key)) + self.assertTrue( + ( + "You have successfully logged into " + "the Central Authentication Service" + ) in response.content + ) + self.assertTrue( + models.User.objects.get( + username=settings.CAS_TEST_USER, + session_key=client.session.session_key + ) + ) def test_login_view_post_badlt(self): client, params = get_login_page_params() @@ -87,8 +101,12 @@ class LoginTestCase(TestCase): self.assertEqual(response.status_code, 200) self.assertTrue("Invalid login ticket" in response.content) - self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) - + self.assertFalse( + ( + "You have successfully logged into " + "the Central Authentication Service" + ) in response.content + ) def test_login_view_post_badpass_good_lt(self): client, params = get_login_page_params() @@ -97,19 +115,35 @@ class LoginTestCase(TestCase): response = client.post('/login', params) self.assertEqual(response.status_code, 200) - self.assertTrue(" The credentials you provided cannot be determined to be authentic" in response.content) - self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) - + self.assertTrue( + ( + "The credentials you provided cannot be " + "determined to be authentic" + ) in response.content + ) + self.assertFalse( + ( + "You have successfully logged into " + "the Central Authentication Service" + ) in response.content + ) def test_view_login_get_auth_allowed_service(self): client = get_auth_client() response = client.get("/login?service=https://www.example.com") self.assertEqual(response.status_code, 302) self.assertTrue(response.has_header('Location')) - self.assertTrue(response['Location'].startswith("https://www.example.com?ticket=%s-" % settings.CAS_SERVICE_TICKET_PREFIX)) + self.assertTrue( + response['Location'].startswith( + "https://www.example.com?ticket=%s-" % settings.CAS_SERVICE_TICKET_PREFIX + ) + ) ticket_value = response['Location'].split('ticket=')[-1] - user = models.User.objects.get(username=settings.CAS_TEST_USER, session_key=client.session.session_key) + user = models.User.objects.get( + username=settings.CAS_TEST_USER, + session_key=client.session.session_key + ) self.assertTrue(user) ticket = models.ServiceTicket.objects.get(value=ticket_value) self.assertEqual(ticket.user, user) @@ -134,15 +168,30 @@ class LogoutTestCase(TestCase): response = client.get("/login") self.assertEqual(response.status_code, 200) - self.assertTrue("You have successfully logged into the Central Authentication Service" in response.content) + self.assertTrue( + ( + "You have successfully logged into " + "the Central Authentication Service" + ) in response.content + ) response = client.get("/logout") self.assertEqual(response.status_code, 200) - self.assertTrue("You have successfully logged out from the Central Authentication Service" in response.content) + self.assertTrue( + ( + "You have successfully logged out from " + "the Central Authentication Service" + ) in response.content + ) response = client.get("/login") self.assertEqual(response.status_code, 200) - self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) + self.assertFalse( + ( + "You have successfully logged into " + "the Central Authentication Service" + ) in response.content + ) def test_logout_view_url(self): client = get_auth_client() @@ -154,7 +203,12 @@ class LogoutTestCase(TestCase): response = client.get("/login") self.assertEqual(response.status_code, 200) - self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) + self.assertFalse( + ( + "You have successfully logged into " + "the Central Authentication Service" + ) in response.content + ) def test_logout_view_service(self): client = get_auth_client() @@ -166,11 +220,12 @@ class LogoutTestCase(TestCase): response = client.get("/login") self.assertEqual(response.status_code, 200) - self.assertFalse("You have successfully logged into the Central Authentication Service" in response.content) - - - open("/tmp/test.html", "w").write(response.content) - + self.assertFalse( + ( + "You have successfully logged into " + "the Central Authentication Service" + ) in response.content + ) class AuthTestCase(TestCase): @@ -186,35 +241,75 @@ class AuthTestCase(TestCase): def test_auth_view_goodpass(self): settings.CAS_AUTH_SHARED_SECRET = 'test' client = Client() - response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':self.service, 'secret':'test'}) + response = client.post( + '/auth', + { + 'username': settings.CAS_TEST_USER, + 'password': settings.CAS_TEST_PASSWORD, + 'service': self.service, + 'secret': 'test' + } + ) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, 'yes\n') def test_auth_view_badpass(self): settings.CAS_AUTH_SHARED_SECRET = 'test' client = Client() - response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':'badpass', 'service':self.service, 'secret':'test'}) + response = client.post( + '/auth', + { + 'username': settings.CAS_TEST_USER, + 'password': 'badpass', + 'service': self.service, + 'secret': 'test' + } + ) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, 'no\n') def test_auth_view_badservice(self): settings.CAS_AUTH_SHARED_SECRET = 'test' client = Client() - response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':'https://www.example.org', 'secret':'test'}) + response = client.post( + '/auth', + { + 'username': settings.CAS_TEST_USER, + 'password': settings.CAS_TEST_PASSWORD, + 'service': 'https://www.example.org', + 'secret': 'test' + } + ) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, 'no\n') def test_auth_view_badsecret(self): settings.CAS_AUTH_SHARED_SECRET = 'test' client = Client() - response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':self.service, 'secret':'badsecret'}) + response = client.post( + '/auth', + { + 'username': settings.CAS_TEST_USER, + 'password': settings.CAS_TEST_PASSWORD, + 'service': self.service, + 'secret': 'badsecret' + } + ) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, 'no\n') def test_auth_view_badsettings(self): settings.CAS_AUTH_SHARED_SECRET = None client = Client() - response = client.post('/auth', {'username':settings.CAS_TEST_USER, 'password':settings.CAS_TEST_PASSWORD, 'service':self.service, 'secret':'test'}) + response = client.post( + '/auth', + { + 'username': settings.CAS_TEST_USER, + 'password': settings.CAS_TEST_PASSWORD, + 'service': self.service, + 'secret': 'test' + } + ) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, "no\nplease set CAS_AUTH_SHARED_SECRET") @@ -242,7 +337,10 @@ class ValidateTestCase(TestCase): (user, ticket) = get_user_ticket_request(self.service) client = Client() - response = client.get('/validate', {'ticket': ticket.value, 'service': "https://www.example.org"}) + response = client.get( + '/validate', + {'ticket': ticket.value, 'service': "https://www.example.org"} + ) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, 'no\n') @@ -250,10 +348,14 @@ class ValidateTestCase(TestCase): (user, ticket) = get_user_ticket_request(self.service) client = Client() - response = client.get('/validate', {'ticket': "%s-RANDOM" % settings.CAS_SERVICE_TICKET_PREFIX, 'service': self.service}) + response = client.get( + '/validate', + {'ticket': "%s-RANDOM" % settings.CAS_SERVICE_TICKET_PREFIX, 'service': self.service} + ) self.assertEqual(response.status_code, 200) self.assertEqual(response.content, 'no\n') + class ValidateServiceTestCase(TestCase): def setUp(self): @@ -274,18 +376,24 @@ class ValidateServiceTestCase(TestCase): self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - sucess = root.xpath("//cas:authenticationSuccess", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + sucess = root.xpath( + "//cas:authenticationSuccess", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertTrue(sucess) users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"}) self.assertEqual(len(users), 1) self.assertEqual(users[0].text, settings.CAS_TEST_USER) - attributes = root.xpath("//cas:attributes", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + attributes = root.xpath( + "//cas:attributes", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(attributes), 1) attrs1 = {} for attr in attributes[0]: - attrs1[attr.tag[len("http://www.yale.edu/tp/cas")+2:]]=attr.text + attrs1[attr.tag[len("http://www.yale.edu/tp/cas")+2:]] = attr.text attributes = root.xpath("//cas:attribute", namespaces={'cas': "http://www.yale.edu/tp/cas"}) self.assertEqual(len(attributes), len(attrs1)) @@ -304,7 +412,10 @@ class ValidateServiceTestCase(TestCase): self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + error = root.xpath( + "//cas:authenticationFailure", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(error), 1) self.assertEqual(error[0].attrib['code'], "INVALID_SERVICE") self.assertEqual(error[0].text, bad_service) @@ -318,7 +429,10 @@ class ValidateServiceTestCase(TestCase): self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + error = root.xpath( + "//cas:authenticationFailure", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(error), 1) self.assertEqual(error[0].attrib['code'], "INVALID_TICKET") self.assertEqual(error[0].text, 'ticket not found') @@ -332,7 +446,10 @@ class ValidateServiceTestCase(TestCase): self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + error = root.xpath( + "//cas:authenticationFailure", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(error), 1) self.assertEqual(error[0].attrib['code'], "INVALID_TICKET") self.assertEqual(error[0].text, bad_ticket) @@ -344,13 +461,18 @@ class ValidateServiceTestCase(TestCase): (user, ticket) = get_user_ticket_request(service) client = Client() - response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': service, 'pgtUrl': service}) + response = client.get( + '/serviceValidate', + {'ticket': ticket.value, 'service': service, 'pgtUrl': service} + ) pgt_params = utils.PGTUrlHandler.PARAMS.copy() self.assertEqual(response.status_code, 200) - root = etree.fromstring(response.content) - pgtiou = root.xpath("//cas:proxyGrantingTicket", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + pgtiou = root.xpath( + "//cas:proxyGrantingTicket", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(pgtiou), 1) self.assertEqual(pgt_params["pgtIou"], pgtiou[0].text) self.assertTrue("pgtId" in pgt_params) @@ -361,15 +483,22 @@ class ValidateServiceTestCase(TestCase): (user, ticket) = get_user_ticket_request(self.service) client = Client() - response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': self.service, 'pgtUrl': self.service}) + response = client.get( + '/serviceValidate', + {'ticket': ticket.value, 'service': self.service, 'pgtUrl': self.service} + ) self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + error = root.xpath( + "//cas:authenticationFailure", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(error), 1) self.assertEqual(error[0].attrib['code'], "INVALID_PROXY_CALLBACK") self.assertEqual(error[0].text, "callback url not allowed by configuration") + class ProxyTestCase(TestCase): def setUp(self): @@ -383,7 +512,6 @@ class ProxyTestCase(TestCase): ) models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern) - def test_validate_proxy_ok(self): params = get_pgt() @@ -396,18 +524,23 @@ class ProxyTestCase(TestCase): sucess = root.xpath("//cas:proxySuccess", namespaces={'cas': "http://www.yale.edu/tp/cas"}) self.assertTrue(sucess) - proxy_ticket = root.xpath("//cas:proxyTicket", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + proxy_ticket = root.xpath( + "//cas:proxyTicket", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(proxy_ticket), 1) proxy_ticket = proxy_ticket[0].text - # validate the proxy ticket client2 = Client() response = client2.get('/proxyValidate', {'ticket': proxy_ticket, 'service': self.service}) self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - sucess = root.xpath("//cas:authenticationSuccess", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + sucess = root.xpath( + "//cas:authenticationSuccess", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertTrue(sucess) # check that the proxy is send to the end service @@ -422,11 +555,14 @@ class ProxyTestCase(TestCase): self.assertEqual(len(users), 1) self.assertEqual(users[0].text, settings.CAS_TEST_USER) - attributes = root.xpath("//cas:attributes", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + attributes = root.xpath( + "//cas:attributes", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(attributes), 1) attrs1 = {} for attr in attributes[0]: - attrs1[attr.tag[len("http://www.yale.edu/tp/cas")+2:]]=attr.text + attrs1[attr.tag[len("http://www.yale.edu/tp/cas")+2:]] = attr.text attributes = root.xpath("//cas:attribute", namespaces={'cas': "http://www.yale.edu/tp/cas"}) self.assertEqual(len(attributes), len(attrs1)) @@ -436,43 +572,68 @@ class ProxyTestCase(TestCase): self.assertEqual(attrs1, attrs2) self.assertEqual(attrs1, settings.CAS_TEST_ATTRIBUTES) - def test_validate_proxy_bad(self): params = get_pgt() # bad PGT client1 = Client() - response = client1.get('/proxy', {'pgt': "%s-RANDOM" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX, 'targetService': params['service']}) + response = client1.get( + '/proxy', + { + 'pgt': "%s-RANDOM" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX, + 'targetService': params['service'] + } + ) self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + error = root.xpath( + "//cas:authenticationFailure", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(error), 1) self.assertEqual(error[0].attrib['code'], "INVALID_TICKET") - self.assertEqual(error[0].text, "PGT %s-RANDOM not found" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX) + self.assertEqual( + error[0].text, + "PGT %s-RANDOM not found" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX + ) # bad targetService client2 = Client() - response = client2.get('/proxy', {'pgt': params['pgtId'], 'targetService': "https://www.example.org"}) + response = client2.get( + '/proxy', + {'pgt': params['pgtId'], 'targetService': "https://www.example.org"} + ) self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + error = root.xpath( + "//cas:authenticationFailure", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(error), 1) self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE") self.assertEqual(error[0].text, "https://www.example.org") - # service do not allow proxy ticket self.service_pattern.proxy = False self.service_pattern.save() client3 = Client() - response = client3.get('/proxy', {'pgt': params['pgtId'], 'targetService': params['service']}) + response = client3.get( + '/proxy', + {'pgt': params['pgtId'], 'targetService': params['service']} + ) self.assertEqual(response.status_code, 200) root = etree.fromstring(response.content) - error = root.xpath("//cas:authenticationFailure", namespaces={'cas': "http://www.yale.edu/tp/cas"}) + error = root.xpath( + "//cas:authenticationFailure", + namespaces={'cas': "http://www.yale.edu/tp/cas"} + ) self.assertEqual(len(error), 1) self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE") - self.assertEqual(error[0].text, 'the service %s do not allow proxy ticket' % params['service']) + self.assertEqual( + error[0].text, + 'the service %s do not allow proxy ticket' % params['service'] + ) diff --git a/cas_server/utils.py b/cas_server/utils.py index 69e5623..4db8f9e 100644 --- a/cas_server/utils.py +++ b/cas_server/utils.py @@ -149,7 +149,8 @@ def gen_saml_id(): class PGTUrlHandler(BaseHTTPServer.BaseHTTPRequestHandler): - PARAMS={} + PARAMS = {} + def do_GET(s): s.send_response(200) s.send_header("Content-type", "text/plain") @@ -159,6 +160,7 @@ class PGTUrlHandler(BaseHTTPServer.BaseHTTPRequestHandler): params = dict(parse_qsl(url.query)) PGTUrlHandler.PARAMS.update(params) s.wfile.write("%s" % params) + def log_message(self, format, *args): return @@ -166,11 +168,12 @@ class PGTUrlHandler(BaseHTTPServer.BaseHTTPRequestHandler): def run(): server_class = BaseHTTPServer.HTTPServer httpd = server_class(("127.0.0.1", 0), PGTUrlHandler) - (host, port) = httpd.socket.getsockname() + (host, port) = httpd.socket.getsockname() + def lauch(): httpd.handle_request() - #httpd.serve_forever() httpd.server_close() + httpd_thread = Thread(target=lauch) httpd_thread.daemon = True httpd_thread.start()