213 lines
No EOL
9.1 KiB
Python
213 lines
No EOL
9.1 KiB
Python
from django.conf import settings
|
|
from django.views.generic import FormView, View, TemplateView
|
|
from django.contrib.auth import authenticate, login, logout, get_user_model
|
|
from django.shortcuts import redirect
|
|
from django.core.exceptions import PermissionDenied
|
|
from django.contrib import messages
|
|
from django.utils import timezone
|
|
from django.db.models import Max
|
|
from django.views.decorators.cache import never_cache
|
|
|
|
from core.forms import LoginForm, OTPSelectorForm, OTPVerificationForm, PWResetForm, PWRequestForm
|
|
from core.models.auth import LoginSession, PWResetToken, IPLimit, LoginLog
|
|
from core.helpers.otp import get_user_otps, get_otp_choices, get_otp_by_name
|
|
from core.helpers.mail import simple_send_mail
|
|
from core.helpers.auth import generate_pwreset_mail, login_fail, login_success, request_password
|
|
from core.helpers.request import get_client_ip
|
|
|
|
from dbsettings.functions import getValue
|
|
|
|
class RateLimitedView(TemplateView):
|
|
template_name = f"{settings.EXPEPHALON_BACKEND}/auth/ratelimit.html"
|
|
|
|
@never_cache
|
|
def dispatch(self, request, *args, **kwargs):
|
|
try:
|
|
limit = IPLimit.objects.filter(ip=get_client_ip(request), end__gte=timezone.now()).first()
|
|
messages.error(request, f"Sorry, your IP has been blocked, so you cannot login at the moment.{f' Reason: {limit.reason}' if limit.reason else ''} Please try again after {str(iplimit.end)}, or contact support if you need help getting into your account.")
|
|
return super().dispatch(request, *args, **kwargs)
|
|
except:
|
|
return redirect("login")
|
|
|
|
class AuthView(FormView):
|
|
@never_cache
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if IPLimit.objects.filter(ip=get_client_ip(request), end__gte=timezone.now()):
|
|
return redirect("ratelimited")
|
|
|
|
period = timezone.now() - timezone.timedelta(seconds=int(getValue("core.auth.ratelimit.period", 600)))
|
|
failures = LoginLog.objects.filter(ip=get_client_ip(request), success=False, timestamp__gte=period)
|
|
if len(failures) >= int(getValue("core.auth.ratelimit.attempts", 5)):
|
|
IPLimit.objects.create(ip=get_client_ip(request), end=timezone.now() + timezone.timedelta(seconds=getValue("core.auth.ratelimit.block", 3600)), reason="Too many failed login attempts.")
|
|
return redirect("ratelimited")
|
|
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
class LoginView(AuthView):
|
|
template_name = f"{settings.EXPEPHALON_BACKEND}/auth/login.html"
|
|
form_class = LoginForm
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
if request.user.is_authenticated:
|
|
return redirect(request.GET.get("next", "dashboard"))
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["title"] = "Login"
|
|
return context
|
|
|
|
def form_invalid(self, form):
|
|
try:
|
|
user = get_user_model().objects.get(username=form.cleaned_data.get("email", ""))
|
|
except get_user_model().DoesNotExist:
|
|
user = None
|
|
login_fail(self.request, user, "The credentials you entered are invalid. Please try again.")
|
|
return super().form_invalid(form)
|
|
|
|
def form_valid(self, form):
|
|
user = authenticate(username=form.cleaned_data['email'], password=form.cleaned_data['password'])
|
|
if user:
|
|
if not get_user_otps(user):
|
|
login(self.request, user)
|
|
return redirect("dashboard")
|
|
session = LoginSession.objects.create(user=user)
|
|
self.request.session["otpsession"] = str(session.uuid)
|
|
self.request.session["next"] = self.request.GET.get("next", "dashboard")
|
|
return redirect("otpselector")
|
|
return self.form_invalid(form)
|
|
|
|
class OTPSelectorView(AuthView):
|
|
template_name = f"{settings.EXPEPHALON_BACKEND}/auth/otp_selector.html"
|
|
form_class = OTPSelectorForm
|
|
|
|
def clean_session(self):
|
|
for key in ("otpsession", "otpprovider", "next"):
|
|
try:
|
|
del self.request.session[key]
|
|
except:
|
|
pass
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super().get_form_kwargs()
|
|
try:
|
|
assert self.request.session["otpsession"]
|
|
except:
|
|
raise PermissionDenied()
|
|
user = LoginSession.objects.get(uuid=self.request.session["otpsession"]).user
|
|
kwargs["otp_choices"] = get_otp_choices(user)
|
|
return kwargs
|
|
|
|
def form_valid(self, form):
|
|
self.request.session["otpprovider"] = form.cleaned_data["provider"]
|
|
return redirect("otpvalidator")
|
|
|
|
def form_invalid(self, form):
|
|
self.clean_session()
|
|
messages.error("Something went wrong selecting the OTP provider. Please try again.")
|
|
return redirect("login")
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["title"] = "Two-Factor Authentication"
|
|
|
|
user = LoginSession.objects.get(uuid=self.request.session["otpsession"]).user
|
|
context["first_name"] = user.profile.get_internal_name
|
|
return context
|
|
|
|
class OTPValidatorView(AuthView):
|
|
template_name = f"{settings.EXPEPHALON_BACKEND}/auth/otp_verifier.html"
|
|
form_class = OTPVerificationForm
|
|
|
|
def clean_session(self):
|
|
for key in ("otpsession", "otpprovider", "next"):
|
|
try:
|
|
del self.request.session[key]
|
|
except:
|
|
pass
|
|
|
|
def validate_session(self, request):
|
|
try:
|
|
assert request.session["otpsession"]
|
|
assert request.session["otpprovider"]
|
|
user = LoginSession.objects.get(uuid=request.session["otpsession"]).user
|
|
assert request.session["otpprovider"] in get_user_otps(user).keys()
|
|
provider = get_otp_by_name(request.session["otpprovider"])()
|
|
return user, provider
|
|
except:
|
|
self.clean_session()
|
|
raise PermissionDenied()
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
user, provider = self.validate_session(request)
|
|
response = provider.start_authentication(user)
|
|
messages.info(request, response)
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
self.validate_session(request)
|
|
return super().post(request, *args, **kwargs)
|
|
|
|
def form_invalid(self, form):
|
|
user, provider = self.validate_session(self.request)
|
|
self.clean_session()
|
|
login_fail(self.request, user, "Incorrect token entered. Please try again. If the issue persists, contact support to regain access to your account.")
|
|
return redirect("login")
|
|
|
|
def form_valid(self, form):
|
|
user, provider = self.validate_session(self.request)
|
|
if provider.validate_token(user, form.cleaned_data["token"]):
|
|
login(self.request, user)
|
|
ret = redirect(self.request.session.get("next", "dashboard"))
|
|
self.clean_session()
|
|
return ret
|
|
return self.form_invalid(form)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["title"] = "Two-Factor Authentication"
|
|
|
|
user, provider = self.validate_session(self.request)
|
|
context["first_name"] = user.profile.get_internal_name
|
|
return context
|
|
|
|
class LogoutView(View):
|
|
def get(self, request, *args, **kwargs):
|
|
logout(request)
|
|
return redirect("login")
|
|
|
|
class PWResetView(AuthView):
|
|
template_name = f"{settings.EXPEPHALON_BACKEND}/auth/pwreset.html"
|
|
form_class = PWResetForm
|
|
|
|
def validate_session(self):
|
|
try:
|
|
token = PWResetToken.objects.get(token=self.kwargs["pk"])
|
|
max_age = int(getValue("core.auth.pwreset.max_age", "86400"))
|
|
assert token.creation > timezone.now() - timezone.timedelta(seconds=max_age)
|
|
return token.user
|
|
except:
|
|
messages.error(self.request, "Incorrect or expired password reset link.")
|
|
raise PermissionDenied()
|
|
|
|
def form_valid(self, form):
|
|
user = self.validate_session()
|
|
if not form.cleaned_data["password1"] == form.cleaned_data["password2"]:
|
|
messages.error(self.request, "Entered passwords do not match - please try again.")
|
|
return self.form_invalid(form)
|
|
user.set_password(form.cleaned_data["password1"])
|
|
user.save()
|
|
messages.success(self.request, "Your password has been changed. You can now login with your new password.")
|
|
return redirect("login")
|
|
|
|
class PWRequestView(AuthView):
|
|
template_name = f"{settings.EXPEPHALON_BACKEND}/auth/pwrequest.html"
|
|
form_class = PWRequestForm
|
|
|
|
def form_valid(self, form):
|
|
try:
|
|
user = get_user_model().objects.get(username=form.cleaned_data["email"])
|
|
request_password(user)
|
|
finally:
|
|
messages.success(self.request, "If a matching account was found, you should shortly receive an email containing password reset instructions. If you have not received this message after five minutes, please verify that you have entered the correct email address, or contact support.")
|
|
return redirect("login") |