diff --git a/authentication/models/otp.py b/authentication/models/otp.py index f08bb49..270aa1b 100644 --- a/authentication/models/otp.py +++ b/authentication/models/otp.py @@ -4,19 +4,101 @@ from django.contrib.auth import get_user_model from annoying.fields import AutoOneToOneField from pyotp import TOTP, random_base32 +from datetime import datetime + class TOTPSecret(models.Model): + """A secret for a user's TOTP device + + Attributes: + user: The user to whom this secret belongs + secret: The secret + active: Whether this secret is currently active + """ + user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True) secret = models.CharField(max_length=32, default=random_base32) active = models.BooleanField(default=False) - def verify(self, token): - return TOTP(self.secret).verify(token) + def verify(self, token: str, window: int = 1, set_used=True) -> bool: + """Verify a TOTP token - def get_uri(self): - return TOTP(self.secret).provisioning_uri(name=self.user.email, issuer_name="KumiDC") + Args: + token (str): The token to verify + window (int, optional): The number of tokens to check before and after the current token. Defaults to 1. + + Returns: + bool: Whether the token was valid + """ + + if not self.active: + return False + + if UsedTOTPToken.is_used(self, token): + return False + + if TOTP(self.secret).verify(token, valid_window=window): + if set_used: + UsedTOTPToken.objects.create(secret=self, token=token) + return True + + return False + + def get_uri(self) -> str: + """Get the provisioning URI for this secret + + This is used to generate a QR code for the user to scan with their TOTP device. + + Returns: + str: The provisioning URI + """ + return TOTP(self.secret).provisioning_uri( + name=self.user.email, issuer_name="KumiDC" + ) + + def change_secret(self) -> str: + """Change the secret for this user + + This generates a new secret and saves it to the database. + + Returns: + str: The new secret + """ - def change_secret(self): self.secret = random_base32() self.save() - return self.secret \ No newline at end of file + return self.secret + + +class UsedTOTPToken(models.Model): + """A TOTP token that has been used + + We log these so that we can prevent replay attacks. + + Attributes: + secret: The secret that was used + token: The token that was used + timestamp: When the token was used + expiry: How long we should keep this token for + """ + secret = models.ForeignKey(TOTPSecret, on_delete=models.CASCADE) + token = models.CharField(max_length=6) + timestamp = models.DateTimeField(auto_now_add=True) + expiry = models.DateTimeField() + + def save(self, *args, **kwargs): + self.expiry = self.timestamp + timedelta(seconds=60) + super().save(*args, **kwargs) + + @classmethod + def is_used(cls, secret: TOTPSecret, token: str) -> bool: + """Check whether a token has been used + + Args: + secret (TOTPSecret): The secret that was used + token (str): The token that was used + + Returns: + bool: Whether the token has recently been used + """ + return cls.objects.filter(secret=secret, token=token, expiry__gte=datetime.now()).exists() \ No newline at end of file diff --git a/radius/__init__.py b/radius/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/radius/admin.py b/radius/admin.py new file mode 100644 index 0000000..8c38f3f --- /dev/null +++ b/radius/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/radius/apps.py b/radius/apps.py new file mode 100644 index 0000000..8c83714 --- /dev/null +++ b/radius/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class RadiusConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "radius" diff --git a/radius/management/commands/runradius.py b/radius/management/commands/runradius.py new file mode 100644 index 0000000..7fa9aed --- /dev/null +++ b/radius/management/commands/runradius.py @@ -0,0 +1,11 @@ +from radius.server import RadiusServer + +from django.core.management.base import BaseCommand + +class Command(BaseCommand): + help = "Run the RADIUS server" + + def handle(self, *args, **kwargs): + # Set up and run the RADIUS server + srv = RadiusServer() + srv.Run() \ No newline at end of file diff --git a/radius/migrations/__init__.py b/radius/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/radius/models.py b/radius/models.py new file mode 100644 index 0000000..156e232 --- /dev/null +++ b/radius/models.py @@ -0,0 +1,81 @@ +from django.db import models +from django.contrib.auth import get_user_model + +from datetime import datetime, timedelta + +from cidrfield.models import IPNetworkField + + +class RadiusChallengeChoice(models.TextChoices): + NONE = "none", "None" + TOTP = "totp", "Time-based One-time Password" + WEB = "web", "Web-based" + +class RadiusUser(models.Model): + user = models.OneToOneField(get_user_model(), on_delete=models.CASCADE) + challenge = models.CharField( + max_length=255, choices=RadiusChallengeChoice.choices, default=RadiusChallengeChoice.NONE + ) + +class RadiusNetwork(models.Model): + name = models.CharField(max_length=255) + network = IPNetworkField() + + +class RadiusUserIP(models.Model): + user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE) + network = models.ForeignKey(RadiusNetwork, on_delete=models.CASCADE) + ipv4_address = models.GenericIPAddressField(protocol="ipv4", null=True, blank=True) + ipv6_prefix = models.GenericIPAddressField(protocol="ipv6", null=True, blank=True) + + class Meta: + unique_together = ("user", "network") + + def save(self, *args, **kwargs): + if not self.ip_address in self.network.network: + raise ValueError( + f"IPv4 address {self.ip_address} not in network {self.network.network}" + ) + if not self.ipv6_prefix in self.network.network: + raise ValueError( + f"IPv6 prefix {self.ipv6_prefix} not in network {self.network.network}" + ) + + super().save(*args, **kwargs) + + +class RadiusAccountingSession(models.Model): + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + + user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE) + ip = models.ForeignKey(RadiusUserIP, on_delete=models.CASCADE) + + start_time = models.DateTimeField(null=True, blank=True) + stop_time = models.DateTimeField(null=True, blank=True) + + def start(self): + self.start_time = datetime.now() + self.save() + + def stop(self): + self.stop_time = datetime.now() + self.save() + + +class RadiusAccountingEvent(models.Model): + session = models.ForeignKey(RadiusAccountingSession, on_delete=models.CASCADE) + + event_type = models.CharField(max_length=255) + event_time = models.DateTimeField(auto_now_add=True) + raw_data = models.TextField() + + +class RadiusWebAuthentication(models.Model): + user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE) + timestamp = models.DateTimeField(auto_now_add=True) + expiry = models.DateTimeField() + ip = models.IPAddressField() + + def save(self, *args, **kwargs): + self.expiry = self.timestamp + timedelta(minutes=5) + super().save(*args, **kwargs) \ No newline at end of file diff --git a/radius/server.py b/radius/server.py new file mode 100644 index 0000000..da2747f --- /dev/null +++ b/radius/server.py @@ -0,0 +1,157 @@ +from django.contrib.auth import authenticate, get_user_model + +from pyrad import dictionary, packet, server + +from radius.models import ( + RadiusAccountingSession, + RadiusUser, + RadiusAccountingEvent, + RadiusWebAuthentication, +) +from authentication.helpers.otp import has_otp +from authentication.models import TOTPSecret + +import uuid + + +class RadiusServer(server.Server): + def HandleAuthPacket(self, pkt): + """Handle an authentication packet + + Args: + pkt (packet.Packet): The packet to handle + """ + if state: + # Handle the challenge response + if self.validate_challenge_response(packet): + # Consider the challenge successfully passed + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessAccept + else: + # Challenge failed + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + return + + # No state implies this may be the first packet for authentication + # So we need to generate a challenge if necessary + + # First check if the user exists + user = authenticate(username=username, password=password) + + if user is None: + # User does not exist or password is incorrect + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + return + + if user.radiususer.challenge == "none": + # No challenge is required + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessAccept + self.SendReplyPacket(pkt.fd, reply) + return + + # Generate a challenge + + state = str(uuid.uuid4()) + + if user.radiususer.challenge == "totp": + if not has_otp(user): + # This user is not configured for TOTP...? # TODO: Make sure this is handled in the admin + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + return + + challenge_reply = self.CreateReplyPacket( + pkt, + **{"State": state, "Reply-Message": "Please enter your TOTP token."} + ) + + challenge_reply.code = packet.AccessChallenge + self.SendReplyPacket(pkt.fd, challenge_reply) + return + + if user.radiususer.challenge == "web": + challenge_reply = self.CreateReplyPacket( + pkt, + **{ + "State": state, + "Reply-Message": "Please visit http://kumidc.local/auth to authenticate.", # TODO: Replace with a URL to our authentication page + } + ) + challenge_reply.code = packet.AccessChallenge + self.SendReplyPacket(pkt.fd, challenge_reply) + return + + # We should never get here. If we do, something has gone wrong. + reply = self.CreateReplyPacket(pkt) + reply.code = packet.AccessReject + self.SendReplyPacket(pkt.fd, reply) + + def validate_challenge_response(self, packet, password): + state = packet["State"][0] + username = packet["User-Name"][0] + challenge_response = packet.get("User-Password", [None])[0] + + user = get_user_model().objects.get(username=username) + + if not user: + # This user... doesn't exist? + return False + + if not user.radiususer.challenge: + # This user is not configured for challenge-response. Where did this response come from? + return False + + if user.radiususer.challenge == "totp": + if not has_otp(user): + # This user is not configured for TOTP... + return False + + totp = TOTPSecret.objects.get(user=user) + + return totp.verify( + challenge_response + ) # Returns True if the token is valid, False otherwise + + if user.radiususer.challenge == "web": + ip = packet["NAS-IP-Address"][0] + auth = RadiusWebAuthentication.objects.filter( + user=user, ip=ip, expiry__gte=datetime.now() + ) + auth.delete() # Delete any existing authentication sessions for this user + return True + + def HandleAcctPacket(self, pkt: packet.Packet): + """Handle an accounting packet + + Args: + pkt (packet.Packet): The packet to handle + """ + status_type = pkt["Acct-Status-Type"][0] + username = pkt["User-Name"][0] + session_id = pkt["Acct-Session-Id"][0] + + user = RadiusUser.objects.get(user__username=username) + + session, created = RadiusAccountingSession.objects.get_or_create( + id=session_id, + defaults={ + user: user, + }, + ) + + if status_type == "Start": + session.start() + elif status_type == "Stop": + session.stop() + + RadiusAccountingEvent.objects.create( + session=session, event_type=status_type, raw_data=pkt.packet + ) + + acct.save() diff --git a/radius/tests.py b/radius/tests.py new file mode 100644 index 0000000..7ce503c --- /dev/null +++ b/radius/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/radius/views.py b/radius/views.py new file mode 100644 index 0000000..91ea44a --- /dev/null +++ b/radius/views.py @@ -0,0 +1,3 @@ +from django.shortcuts import render + +# Create your views here.