From 70b5235be89ef0dd03b3c77eeb1664320b9039ea Mon Sep 17 00:00:00 2001 From: Kumi Date: Sun, 28 Jan 2024 22:23:05 +0100 Subject: [PATCH] Enhanced TOTP verification and added RADIUS support Implement additional security and functionality in authentication with the introduction of docstrings, type hints, and extended verification logic in the TOTP model to prevent repeated token use, improving robustness against replay attacks. Simultaneously, established the groundwork for RADIUS (Remote Authentication Dial-In User Service) support by creating models and management commands essential for handling authentication, accounting packets, and web-based authentication challenges, broadening the system's capability to integrate with network access servers and services. Resolves issues with token replay attacks and sets the stage for scalable network authentication mechanisms. --- authentication/models/otp.py | 94 +++++++++++++- radius/__init__.py | 0 radius/admin.py | 3 + radius/apps.py | 6 + radius/management/commands/runradius.py | 11 ++ radius/migrations/__init__.py | 0 radius/models.py | 81 ++++++++++++ radius/server.py | 157 ++++++++++++++++++++++++ radius/tests.py | 3 + radius/views.py | 3 + 10 files changed, 352 insertions(+), 6 deletions(-) create mode 100644 radius/__init__.py create mode 100644 radius/admin.py create mode 100644 radius/apps.py create mode 100644 radius/management/commands/runradius.py create mode 100644 radius/migrations/__init__.py create mode 100644 radius/models.py create mode 100644 radius/server.py create mode 100644 radius/tests.py create mode 100644 radius/views.py diff --git a/authentication/models/otp.py b/authentication/models/otp.py index f08bb49..270aa1b 100644 --- a/authentication/models/otp.py +++ b/authentication/models/otp.py @@ -4,19 +4,101 @@ from django.contrib.auth import get_user_model from annoying.fields import AutoOneToOneField from pyotp import TOTP, random_base32 +from datetime import datetime + class TOTPSecret(models.Model): + """A secret for a user's TOTP device + + Attributes: + user: The user to whom this secret belongs + secret: The secret + active: Whether this secret is currently active + """ + user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True) secret = models.CharField(max_length=32, default=random_base32) active = models.BooleanField(default=False) - def verify(self, token): - return TOTP(self.secret).verify(token) + def verify(self, token: str, window: int = 1, set_used=True) -> bool: + """Verify a TOTP token - def get_uri(self): - return TOTP(self.secret).provisioning_uri(name=self.user.email, issuer_name="KumiDC") + Args: + token (str): The token to verify + window (int, optional): The number of tokens to check before and after the current token. Defaults to 1. + + Returns: + bool: Whether the token was valid + """ + + if not self.active: + return False + + if UsedTOTPToken.is_used(self, token): + return False + + if TOTP(self.secret).verify(token, valid_window=window): + if set_used: + UsedTOTPToken.objects.create(secret=self, token=token) + return True + + return False + + def get_uri(self) -> str: + """Get the provisioning URI for this secret + + This is used to generate a QR code for the user to scan with their TOTP device. + + Returns: + str: The provisioning URI + """ + return TOTP(self.secret).provisioning_uri( + name=self.user.email, issuer_name="KumiDC" + ) + + def change_secret(self) -> str: + """Change the secret for this user + + This generates a new secret and saves it to the database. + + Returns: + str: The new secret + """ - def change_secret(self): self.secret = random_base32() self.save() - return self.secret \ No newline at end of file + return self.secret + + +class UsedTOTPToken(models.Model): + """A TOTP token that has been used + + We log these so that we can prevent replay attacks. + + Attributes: + secret: The secret that was used + token: The token that was used + timestamp: When the token was used + expiry: How long we should keep this token for + """ + secret = models.ForeignKey(TOTPSecret, on_delete=models.CASCADE) + token = models.CharField(max_length=6) + timestamp = models.DateTimeField(auto_now_add=True) + expiry = models.DateTimeField() + + def save(self, *args, **kwargs): + self.expiry = self.timestamp + timedelta(seconds=60) + super().save(*args, **kwargs) + + @classmethod + def is_used(cls, secret: TOTPSecret, token: str) -> bool: + """Check whether a token has been used + + Args: + secret (TOTPSecret): The secret that was used + token (str): The token that was used + + Returns: + bool: Whether the token has recently been used + """ + return cls.objects.filter(secret=secret, token=token, expiry__gte=datetime.now()).exists() \ No newline at end of file diff --git a/radius/__init__.py b/radius/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/radius/admin.py b/radius/admin.py new file mode 100644 index 0000000..8c38f3f --- /dev/null +++ b/radius/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/radius/apps.py b/radius/apps.py new file mode 100644 index 0000000..8c83714 --- /dev/null +++ b/radius/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class RadiusConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "radius" diff --git a/radius/management/commands/runradius.py b/radius/management/commands/runradius.py new file mode 100644 index 0000000..7fa9aed --- /dev/null +++ b/radius/management/commands/runradius.py @@ -0,0 +1,11 @@ +from radius.server import RadiusServer + +from django.core.management.base import BaseCommand + +class Command(BaseCommand): + help = "Run the RADIUS server" + + def handle(self, *args, **kwargs): + # Set up and run the RADIUS server + srv = RadiusServer() + srv.Run() \ No newline at end of file diff --git a/radius/migrations/__init__.py b/radius/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/radius/models.py b/radius/models.py new file mode 100644 index 0000000..156e232 --- /dev/null +++ b/radius/models.py @@ -0,0 +1,81 @@ +from django.db import models +from django.contrib.auth import get_user_model + +from datetime import datetime, timedelta + +from cidrfield.models import IPNetworkField + + +class RadiusChallengeChoice(models.TextChoices): + NONE = "none", "None" + TOTP = "totp", "Time-based One-time Password" + WEB = "web", "Web-based" + +class RadiusUser(models.Model): + user = models.OneToOneField(get_user_model(), on_delete=models.CASCADE) + challenge = models.CharField( + max_length=255, choices=RadiusChallengeChoice.choices, default=RadiusChallengeChoice.NONE + ) + +class RadiusNetwork(models.Model): + name = models.CharField(max_length=255) + network = IPNetworkField() + + +class RadiusUserIP(models.Model): + user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE) + network = models.ForeignKey(RadiusNetwork, on_delete=models.CASCADE) + ipv4_address = models.GenericIPAddressField(protocol="ipv4", null=True, blank=True) + ipv6_prefix = models.GenericIPAddressField(protocol="ipv6", null=True, blank=True) + + class Meta: + unique_together = ("user", "network") + + def save(self, *args, **kwargs): + if not self.ip_address in self.network.network: + raise ValueError( + f"IPv4 address {self.ip_address} not in network {self.network.network}" + ) + if not self.ipv6_prefix in self.network.network: + raise ValueError( + f"IPv6 prefix {self.ipv6_prefix} not in network {self.network.network}" + ) + + super().save(*args, **kwargs) + + +class RadiusAccountingSession(models.Model): + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + + user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE) + ip = models.ForeignKey(RadiusUserIP, on_delete=models.CASCADE) + + start_time = models.DateTimeField(null=True, blank=True) + stop_time = models.DateTimeField(null=True, blank=True) + + def start(self): + self.start_time = datetime.now() + self.save() + + def stop(self): + self.stop_time = datetime.now() + self.save() + + +class RadiusAccountingEvent(models.Model): + session = models.ForeignKey(RadiusAccountingSession, on_delete=models.CASCADE) + + event_type = models.CharField(max_length=255) + event_time = models.DateTimeField(auto_now_add=True) + raw_data = models.TextField() + + +class RadiusWebAuthentication(models.Model): + user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE) + timestamp = models.DateTimeField(auto_now_add=True) + expiry = models.DateTimeField() + ip = models.IPAddressField() + + def save(self, *args, **kwargs): + self.expiry = self.timestamp + timedelta(minutes=5) + super().save(*args, **kwargs) \ No newline at end of file diff --git a/radius/server.py b/radius/server.py new file mode 100644 index 0000000..da2747f --- /dev/null +++ b/radius/server.py @@ -0,0 +1,157 @@ +from django.contrib.auth import authenticate, get_user_model + +from pyrad import dictionary, packet, server + +from radius.models import ( + RadiusAccountingSession, + RadiusUser, + RadiusAccountingEvent, + RadiusWebAuthentication, +) +from authentication.helpers.otp import has_otp +from authentication.models import TOTPSecret + +import uuid + + +class RadiusServer(server.Server): + def HandleAuthPacket(self, pkt): + """Handle an authentication packet + + Args: + pkt (packet.Packet): The packet to handle + """ + if state: + # Handle the challenge response + if self.validate_challenge_response(packet): + # Consider the challenge successfully passed + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessAccept + else: + # Challenge failed + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + return + + # No state implies this may be the first packet for authentication + # So we need to generate a challenge if necessary + + # First check if the user exists + user = authenticate(username=username, password=password) + + if user is None: + # User does not exist or password is incorrect + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + return + + if user.radiususer.challenge == "none": + # No challenge is required + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessAccept + self.SendReplyPacket(pkt.fd, reply) + return + + # Generate a challenge + + state = str(uuid.uuid4()) + + if user.radiususer.challenge == "totp": + if not has_otp(user): + # This user is not configured for TOTP...? # TODO: Make sure this is handled in the admin + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + return + + challenge_reply = self.CreateReplyPacket( + pkt, + **{"State": state, "Reply-Message": "Please enter your TOTP token."} + ) + + challenge_reply.code = packet.AccessChallenge + self.SendReplyPacket(pkt.fd, challenge_reply) + return + + if user.radiususer.challenge == "web": + challenge_reply = self.CreateReplyPacket( + pkt, + **{ + "State": state, + "Reply-Message": "Please visit http://kumidc.local/auth to authenticate.", # TODO: Replace with a URL to our authentication page + } + ) + challenge_reply.code = packet.AccessChallenge + self.SendReplyPacket(pkt.fd, challenge_reply) + return + + # We should never get here. If we do, something has gone wrong. + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + + def validate_challenge_response(self, packet, password): + state = packet["State"][0] + username = packet["User-Name"][0] + challenge_response = packet.get("User-Password", [None])[0] + + user = get_user_model().objects.get(username=username) + + if not user: + # This user... doesn't exist? + return False + + if not user.radiususer.challenge: + # This user is not configured for challenge-response. Where did this response come from? + return False + + if user.radiususer.challenge == "totp": + if not has_otp(user): + # This user is not configured for TOTP... + return False + + totp = TOTPSecret.objects.get(user=user) + + return totp.verify( + challenge_response + ) # Returns True if the token is valid, False otherwise + + if user.radiususer.challenge == "web": + ip = packet["NAS-IP-Address"][0] + auth = RadiusWebAuthentication.objects.filter( + user=user, ip=ip, expiry__gte=datetime.now() + ) + auth.delete() # Delete any existing authentication sessions for this user + return True + + def HandleAcctPacket(self, pkt: packet.Packet): + """Handle an accounting packet + + Args: + pkt (packet.Packet): The packet to handle + """ + status_type = pkt["Acct-Status-Type"][0] + username = pkt["User-Name"][0] + session_id = pkt["Acct-Session-Id"][0] + + user = RadiusUser.objects.get(user__username=username) + + session, created = RadiusAccountingSession.objects.get_or_create( + id=session_id, + defaults={ + user: user, + }, + ) + + if status_type == "Start": + session.start() + elif status_type == "Stop": + session.stop() + + RadiusAccountingEvent.objects.create( + session=session, event_type=status_type, raw_data=pkt.packet + ) + + acct.save() diff --git a/radius/tests.py b/radius/tests.py new file mode 100644 index 0000000..7ce503c --- /dev/null +++ b/radius/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/radius/views.py b/radius/views.py new file mode 100644 index 0000000..91ea44a --- /dev/null +++ b/radius/views.py @@ -0,0 +1,3 @@ +from django.shortcuts import render + +# Create your views here.