Correct some code style errors and remove some forgotten debug lines
This commit is contained in:
parent
90daf3d2a0
commit
7cc3ba689f
4 changed files with 65 additions and 60 deletions
|
@ -34,10 +34,6 @@ PROVIDERS_LIST = list(PROVIDERS.keys())
|
||||||
PROVIDERS_LIST.sort()
|
PROVIDERS_LIST.sort()
|
||||||
|
|
||||||
|
|
||||||
def getaddrinfo_mock(name, port, *args, **kwargs):
|
|
||||||
return [(2, 1, 6, '', ('127.0.0.1', 80))]
|
|
||||||
|
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(
|
||||||
CAS_FEDERATE=True,
|
CAS_FEDERATE=True,
|
||||||
CAS_FEDERATE_PROVIDERS=PROVIDERS,
|
CAS_FEDERATE_PROVIDERS=PROVIDERS,
|
||||||
|
@ -187,7 +183,6 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||||
try to fetch a new ticket if the provided ticket validation fail
|
try to fetch a new ticket if the provided ticket validation fail
|
||||||
(network error or bad ticket)
|
(network error or bad ticket)
|
||||||
"""
|
"""
|
||||||
return
|
|
||||||
good_provider = "example.com"
|
good_provider = "example.com"
|
||||||
bad_provider = "exemple.fr"
|
bad_provider = "exemple.fr"
|
||||||
client = Client()
|
client = Client()
|
||||||
|
@ -285,7 +280,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||||
test the logout function: the user should be log out
|
test the logout function: the user should be log out
|
||||||
and redirected to his CAS logout page
|
and redirected to his CAS logout page
|
||||||
"""
|
"""
|
||||||
# get tickets and connected clients
|
# get tickets and connected clients, then follow normal logout
|
||||||
tickets = self.test_login_post_provider()
|
tickets = self.test_login_post_provider()
|
||||||
for (provider, _, client) in tickets:
|
for (provider, _, client) in tickets:
|
||||||
response = client.get("/logout")
|
response = client.get("/logout")
|
||||||
|
@ -297,6 +292,28 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||||
response = client.get("/login")
|
response = client.get("/login")
|
||||||
self.assert_login_failed(client, response)
|
self.assert_login_failed(client, response)
|
||||||
|
|
||||||
|
# test if the user is already logged out
|
||||||
|
response = client.get("/logout")
|
||||||
|
# no redirection
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
self.assertTrue(
|
||||||
|
(
|
||||||
|
b"You were already logged out from the Central Authentication Service."
|
||||||
|
) in response.content
|
||||||
|
)
|
||||||
|
|
||||||
|
tickets = self.test_login_post_provider()
|
||||||
|
if django.VERSION >= (1, 8):
|
||||||
|
# assume the username session variable has been tempered (should not happend)
|
||||||
|
for (provider, _, client) in tickets:
|
||||||
|
session = client.session
|
||||||
|
session["username"] = settings.CAS_TEST_USER
|
||||||
|
session.save()
|
||||||
|
response = client.get("/logout")
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
response = client.get("/login")
|
||||||
|
self.assert_login_failed(client, response)
|
||||||
|
|
||||||
def test_remember_provider(self):
|
def test_remember_provider(self):
|
||||||
"""
|
"""
|
||||||
If the user check remember, next login should not offer the chose of the backend CAS
|
If the user check remember, next login should not offer the chose of the backend CAS
|
||||||
|
@ -323,7 +340,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||||
session = client.session
|
session = client.session
|
||||||
session["federate_username"] = '%s@%s' % (settings.CAS_TEST_USER, provider)
|
session["federate_username"] = '%s@%s' % (settings.CAS_TEST_USER, provider)
|
||||||
session["federate_ticket"] = utils.gen_st()
|
session["federate_ticket"] = utils.gen_st()
|
||||||
try:
|
if django.VERSION >= (1, 8):
|
||||||
session.save()
|
session.save()
|
||||||
response = client.get("/login")
|
response = client.get("/login")
|
||||||
# we should get a page with a from with all widget hidden that auto POST to /login using
|
# we should get a page with a from with all widget hidden that auto POST to /login using
|
||||||
|
@ -340,5 +357,3 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||||
utils.get_tuple(value, 2, key)
|
utils.get_tuple(value, 2, key)
|
||||||
) in response.content.decode("utf-8"))
|
) in response.content.decode("utf-8"))
|
||||||
self.assertEqual(response.context['post_url'], '/federate')
|
self.assertEqual(response.context['post_url'], '/federate')
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
"""Tests module for models"""
|
"""Tests module for models"""
|
||||||
from cas_server.default_settings import settings
|
from cas_server.default_settings import settings
|
||||||
|
|
||||||
|
import django
|
||||||
from django.test import TestCase, Client
|
from django.test import TestCase, Client
|
||||||
from django.test.utils import override_settings
|
from django.test.utils import override_settings
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
@ -60,31 +61,29 @@ class FederateSLOTestCase(TestCase, UserModels):
|
||||||
tests for clean_deleted_sessions that should delete object for which matching session
|
tests for clean_deleted_sessions that should delete object for which matching session
|
||||||
do not exists anymore
|
do not exists anymore
|
||||||
"""
|
"""
|
||||||
client1 = Client()
|
if django.VERSION >= (1, 8):
|
||||||
client2 = Client()
|
client1 = Client()
|
||||||
client1.get("/login")
|
client2 = Client()
|
||||||
client2.get("/login")
|
client1.get("/login")
|
||||||
session = client2.session
|
client2.get("/login")
|
||||||
session['authenticated'] = True
|
session = client2.session
|
||||||
try:
|
session['authenticated'] = True
|
||||||
session.save()
|
session.save()
|
||||||
except AttributeError:
|
models.FederateSLO.objects.create(
|
||||||
pass
|
username="test1@example.com",
|
||||||
models.FederateSLO.objects.create(
|
session_key=client1.session.session_key,
|
||||||
username="test1@example.com",
|
ticket=utils.gen_st()
|
||||||
session_key=client1.session.session_key,
|
)
|
||||||
ticket=utils.gen_st()
|
models.FederateSLO.objects.create(
|
||||||
)
|
username="test2@example.com",
|
||||||
models.FederateSLO.objects.create(
|
session_key=client2.session.session_key,
|
||||||
username="test2@example.com",
|
ticket=utils.gen_st()
|
||||||
session_key=client2.session.session_key,
|
)
|
||||||
ticket=utils.gen_st()
|
self.assertEqual(len(models.FederateSLO.objects.all()), 2)
|
||||||
)
|
models.FederateSLO.clean_deleted_sessions()
|
||||||
self.assertEqual(len(models.FederateSLO.objects.all()), 2)
|
self.assertEqual(len(models.FederateSLO.objects.all()), 1)
|
||||||
models.FederateSLO.clean_deleted_sessions()
|
with self.assertRaises(models.FederateSLO.DoesNotExist):
|
||||||
self.assertEqual(len(models.FederateSLO.objects.all()), 1)
|
models.FederateSLO.objects.get(username="test1@example.com")
|
||||||
with self.assertRaises(models.FederateSLO.DoesNotExist):
|
|
||||||
models.FederateSLO.objects.get(username="test1@example.com")
|
|
||||||
|
|
||||||
|
|
||||||
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
|
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
|
||||||
|
|
|
@ -208,20 +208,12 @@ class DummyCAS(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||||
self.params.get("ticket").encode("ascii") == self.server.ticket
|
self.params.get("ticket").encode("ascii") == self.server.ticket
|
||||||
):
|
):
|
||||||
self.server.ticket = None
|
self.server.ticket = None
|
||||||
print("good")
|
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
print("bad (%r, %r) != (%r, %r)" % (
|
|
||||||
self.params.get("service").encode("ascii"),
|
|
||||||
self.params.get("ticket").encode("ascii"),
|
|
||||||
self.server.service,
|
|
||||||
self.server.ticket
|
|
||||||
))
|
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def send_headers(self, code, content_type):
|
def send_headers(self, code, content_type):
|
||||||
self.send_response(200)
|
self.send_response(code)
|
||||||
self.send_header("Content-type", content_type)
|
self.send_header("Content-type", content_type)
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
|
|
||||||
|
@ -241,19 +233,19 @@ class DummyCAS(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||||
}:
|
}:
|
||||||
self.send_headers(200, "text/xml; charset=utf-8")
|
self.send_headers(200, "text/xml; charset=utf-8")
|
||||||
if self.test_params():
|
if self.test_params():
|
||||||
t = loader.get_template('cas_server/serviceValidate.xml')
|
template = loader.get_template('cas_server/serviceValidate.xml')
|
||||||
c = Context({
|
context = Context({
|
||||||
'username': self.server.username,
|
'username': self.server.username,
|
||||||
'attributes': self.server.attributes
|
'attributes': self.server.attributes
|
||||||
})
|
})
|
||||||
self.wfile.write(return_bytes(t.render(c), "utf8"))
|
self.wfile.write(return_bytes(template.render(context), "utf8"))
|
||||||
else:
|
else:
|
||||||
t = loader.get_template('cas_server/serviceValidateError.xml')
|
template = loader.get_template('cas_server/serviceValidateError.xml')
|
||||||
c = Context({
|
context = Context({
|
||||||
'code': 'BAD_SERVICE_TICKET',
|
'code': 'BAD_SERVICE_TICKET',
|
||||||
'msg': 'Valids are (%r, %r)' % (self.server.service, self.server.ticket)
|
'msg': 'Valids are (%r, %r)' % (self.server.service, self.server.ticket)
|
||||||
})
|
})
|
||||||
self.wfile.write(return_bytes(t.render(c), "utf8"))
|
self.wfile.write(return_bytes(template.render(context), "utf8"))
|
||||||
else:
|
else:
|
||||||
self.return_404()
|
self.return_404()
|
||||||
|
|
||||||
|
@ -272,8 +264,8 @@ class DummyCAS(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||||
ticket == self.server.ticket
|
ticket == self.server.ticket
|
||||||
):
|
):
|
||||||
self.server.ticket = None
|
self.server.ticket = None
|
||||||
t = loader.get_template('cas_server/samlValidate.xml')
|
template = loader.get_template('cas_server/samlValidate.xml')
|
||||||
c = Context({
|
context = Context({
|
||||||
'IssueInstant': timezone.now().isoformat(),
|
'IssueInstant': timezone.now().isoformat(),
|
||||||
'expireInstant': (timezone.now() + timedelta(seconds=60)).isoformat(),
|
'expireInstant': (timezone.now() + timedelta(seconds=60)).isoformat(),
|
||||||
'Recipient': self.server.service,
|
'Recipient': self.server.service,
|
||||||
|
@ -281,24 +273,22 @@ class DummyCAS(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||||
'username': self.server.username,
|
'username': self.server.username,
|
||||||
'attributes': self.server.attributes,
|
'attributes': self.server.attributes,
|
||||||
})
|
})
|
||||||
self.wfile.write(return_bytes(t.render(c), "utf8"))
|
self.wfile.write(return_bytes(template.render(context), "utf8"))
|
||||||
else:
|
else:
|
||||||
t = loader.get_template('cas_server/samlValidateError.xml')
|
template = loader.get_template('cas_server/samlValidateError.xml')
|
||||||
c = Context({
|
context = Context({
|
||||||
'IssueInstant': timezone.now().isoformat(),
|
'IssueInstant': timezone.now().isoformat(),
|
||||||
'ResponseID': utils.gen_saml_id(),
|
'ResponseID': utils.gen_saml_id(),
|
||||||
'code': 'BAD_SERVICE_TICKET',
|
'code': 'BAD_SERVICE_TICKET',
|
||||||
'msg': 'Valids are (%r, %r)' % (self.server.service, self.server.ticket)
|
'msg': 'Valids are (%r, %r)' % (self.server.service, self.server.ticket)
|
||||||
})
|
})
|
||||||
self.wfile.write(return_bytes(t.render(c), "utf8"))
|
self.wfile.write(return_bytes(template.render(context), "utf8"))
|
||||||
else:
|
else:
|
||||||
self.return_404()
|
self.return_404()
|
||||||
|
|
||||||
def return_404(self):
|
def return_404(self):
|
||||||
self.send_response(404)
|
self.send_headers(404, "text/plain; charset=utf-8")
|
||||||
self.send_header(b"Content-type", "text/plain")
|
self.wfile.write("not found")
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write("not found")
|
|
||||||
|
|
||||||
def log_message(self, *args):
|
def log_message(self, *args):
|
||||||
"""silent any log message"""
|
"""silent any log message"""
|
||||||
|
|
|
@ -134,8 +134,9 @@ class LogoutView(View, LogoutMixin):
|
||||||
if settings.CAS_FEDERATE:
|
if settings.CAS_FEDERATE:
|
||||||
if auth is not None:
|
if auth is not None:
|
||||||
params = utils.copy_params(request.GET)
|
params = utils.copy_params(request.GET)
|
||||||
url = utils.update_url(auth.get_logout_url(), params)
|
url = auth.get_logout_url()
|
||||||
return HttpResponseRedirect(url)
|
if url:
|
||||||
|
return HttpResponseRedirect(utils.update_url(url, params))
|
||||||
# if service is set, redirect to service after logout
|
# if service is set, redirect to service after logout
|
||||||
if self.service:
|
if self.service:
|
||||||
list(messages.get_messages(request)) # clean messages before leaving the django app
|
list(messages.get_messages(request)) # clean messages before leaving the django app
|
||||||
|
|
Loading…
Reference in a new issue