Enhanced TOTP verification and added RADIUS support

Implement additional security and functionality in authentication with the introduction of docstrings, type hints, and extended verification logic in the TOTP model to prevent repeated token use, improving robustness against replay attacks. Simultaneously, established the groundwork for RADIUS (Remote Authentication Dial-In User Service) support by creating models and management commands essential for handling authentication, accounting packets, and web-based authentication challenges, broadening the system's capability to integrate with network access servers and services.

Resolves issues with token replay attacks and sets the stage for scalable network authentication mechanisms.
This commit is contained in:
Kumi 2024-01-28 22:23:05 +01:00
parent 52fbb394ed
commit 70b5235be8
Signed by: kumi
GPG key ID: ECBCC9082395383F
10 changed files with 352 additions and 6 deletions

View file

@ -4,19 +4,101 @@ from django.contrib.auth import get_user_model
from annoying.fields import AutoOneToOneField
from pyotp import TOTP, random_base32
from datetime import datetime
class TOTPSecret(models.Model):
"""A secret for a user's TOTP device
Attributes:
user: The user to whom this secret belongs
secret: The secret
active: Whether this secret is currently active
"""
user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True)
secret = models.CharField(max_length=32, default=random_base32)
active = models.BooleanField(default=False)
def verify(self, token):
return TOTP(self.secret).verify(token)
def verify(self, token: str, window: int = 1, set_used=True) -> bool:
"""Verify a TOTP token
def get_uri(self):
return TOTP(self.secret).provisioning_uri(name=self.user.email, issuer_name="KumiDC")
Args:
token (str): The token to verify
window (int, optional): The number of tokens to check before and after the current token. Defaults to 1.
Returns:
bool: Whether the token was valid
"""
if not self.active:
return False
if UsedTOTPToken.is_used(self, token):
return False
if TOTP(self.secret).verify(token, valid_window=window):
if set_used:
UsedTOTPToken.objects.create(secret=self, token=token)
return True
return False
def get_uri(self) -> str:
"""Get the provisioning URI for this secret
This is used to generate a QR code for the user to scan with their TOTP device.
Returns:
str: The provisioning URI
"""
return TOTP(self.secret).provisioning_uri(
name=self.user.email, issuer_name="KumiDC"
)
def change_secret(self) -> str:
"""Change the secret for this user
This generates a new secret and saves it to the database.
Returns:
str: The new secret
"""
def change_secret(self):
self.secret = random_base32()
self.save()
return self.secret
return self.secret
class UsedTOTPToken(models.Model):
"""A TOTP token that has been used
We log these so that we can prevent replay attacks.
Attributes:
secret: The secret that was used
token: The token that was used
timestamp: When the token was used
expiry: How long we should keep this token for
"""
secret = models.ForeignKey(TOTPSecret, on_delete=models.CASCADE)
token = models.CharField(max_length=6)
timestamp = models.DateTimeField(auto_now_add=True)
expiry = models.DateTimeField()
def save(self, *args, **kwargs):
self.expiry = self.timestamp + timedelta(seconds=60)
super().save(*args, **kwargs)
@classmethod
def is_used(cls, secret: TOTPSecret, token: str) -> bool:
"""Check whether a token has been used
Args:
secret (TOTPSecret): The secret that was used
token (str): The token that was used
Returns:
bool: Whether the token has recently been used
"""
return cls.objects.filter(secret=secret, token=token, expiry__gte=datetime.now()).exists()

0
radius/__init__.py Normal file
View file

3
radius/admin.py Normal file
View file

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
radius/apps.py Normal file
View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class RadiusConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "radius"

View file

@ -0,0 +1,11 @@
from radius.server import RadiusServer
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Run the RADIUS server"
def handle(self, *args, **kwargs):
# Set up and run the RADIUS server
srv = RadiusServer()
srv.Run()

View file

81
radius/models.py Normal file
View file

@ -0,0 +1,81 @@
from django.db import models
from django.contrib.auth import get_user_model
from datetime import datetime, timedelta
from cidrfield.models import IPNetworkField
class RadiusChallengeChoice(models.TextChoices):
NONE = "none", "None"
TOTP = "totp", "Time-based One-time Password"
WEB = "web", "Web-based"
class RadiusUser(models.Model):
user = models.OneToOneField(get_user_model(), on_delete=models.CASCADE)
challenge = models.CharField(
max_length=255, choices=RadiusChallengeChoice.choices, default=RadiusChallengeChoice.NONE
)
class RadiusNetwork(models.Model):
name = models.CharField(max_length=255)
network = IPNetworkField()
class RadiusUserIP(models.Model):
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
network = models.ForeignKey(RadiusNetwork, on_delete=models.CASCADE)
ipv4_address = models.GenericIPAddressField(protocol="ipv4", null=True, blank=True)
ipv6_prefix = models.GenericIPAddressField(protocol="ipv6", null=True, blank=True)
class Meta:
unique_together = ("user", "network")
def save(self, *args, **kwargs):
if not self.ip_address in self.network.network:
raise ValueError(
f"IPv4 address {self.ip_address} not in network {self.network.network}"
)
if not self.ipv6_prefix in self.network.network:
raise ValueError(
f"IPv6 prefix {self.ipv6_prefix} not in network {self.network.network}"
)
super().save(*args, **kwargs)
class RadiusAccountingSession(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
ip = models.ForeignKey(RadiusUserIP, on_delete=models.CASCADE)
start_time = models.DateTimeField(null=True, blank=True)
stop_time = models.DateTimeField(null=True, blank=True)
def start(self):
self.start_time = datetime.now()
self.save()
def stop(self):
self.stop_time = datetime.now()
self.save()
class RadiusAccountingEvent(models.Model):
session = models.ForeignKey(RadiusAccountingSession, on_delete=models.CASCADE)
event_type = models.CharField(max_length=255)
event_time = models.DateTimeField(auto_now_add=True)
raw_data = models.TextField()
class RadiusWebAuthentication(models.Model):
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
timestamp = models.DateTimeField(auto_now_add=True)
expiry = models.DateTimeField()
ip = models.IPAddressField()
def save(self, *args, **kwargs):
self.expiry = self.timestamp + timedelta(minutes=5)
super().save(*args, **kwargs)

157
radius/server.py Normal file
View file

@ -0,0 +1,157 @@
from django.contrib.auth import authenticate, get_user_model
from pyrad import dictionary, packet, server
from radius.models import (
RadiusAccountingSession,
RadiusUser,
RadiusAccountingEvent,
RadiusWebAuthentication,
)
from authentication.helpers.otp import has_otp
from authentication.models import TOTPSecret
import uuid
class RadiusServer(server.Server):
def HandleAuthPacket(self, pkt):
"""Handle an authentication packet
Args:
pkt (packet.Packet): The packet to handle
"""
if state:
# Handle the challenge response
if self.validate_challenge_response(packet):
# Consider the challenge successfully passed
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessAccept
else:
# Challenge failed
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
return
# No state implies this may be the first packet for authentication
# So we need to generate a challenge if necessary
# First check if the user exists
user = authenticate(username=username, password=password)
if user is None:
# User does not exist or password is incorrect
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
return
if user.radiususer.challenge == "none":
# No challenge is required
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessAccept
self.SendReplyPacket(pkt.fd, reply)
return
# Generate a challenge
state = str(uuid.uuid4())
if user.radiususer.challenge == "totp":
if not has_otp(user):
# This user is not configured for TOTP...? # TODO: Make sure this is handled in the admin
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
return
challenge_reply = self.CreateReplyPacket(
pkt,
**{"State": state, "Reply-Message": "Please enter your TOTP token."}
)
challenge_reply.code = packet.AccessChallenge
self.SendReplyPacket(pkt.fd, challenge_reply)
return
if user.radiususer.challenge == "web":
challenge_reply = self.CreateReplyPacket(
pkt,
**{
"State": state,
"Reply-Message": "Please visit http://kumidc.local/auth to authenticate.", # TODO: Replace with a URL to our authentication page
}
)
challenge_reply.code = packet.AccessChallenge
self.SendReplyPacket(pkt.fd, challenge_reply)
return
# We should never get here. If we do, something has gone wrong.
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
def validate_challenge_response(self, packet, password):
state = packet["State"][0]
username = packet["User-Name"][0]
challenge_response = packet.get("User-Password", [None])[0]
user = get_user_model().objects.get(username=username)
if not user:
# This user... doesn't exist?
return False
if not user.radiususer.challenge:
# This user is not configured for challenge-response. Where did this response come from?
return False
if user.radiususer.challenge == "totp":
if not has_otp(user):
# This user is not configured for TOTP...
return False
totp = TOTPSecret.objects.get(user=user)
return totp.verify(
challenge_response
) # Returns True if the token is valid, False otherwise
if user.radiususer.challenge == "web":
ip = packet["NAS-IP-Address"][0]
auth = RadiusWebAuthentication.objects.filter(
user=user, ip=ip, expiry__gte=datetime.now()
)
auth.delete() # Delete any existing authentication sessions for this user
return True
def HandleAcctPacket(self, pkt: packet.Packet):
"""Handle an accounting packet
Args:
pkt (packet.Packet): The packet to handle
"""
status_type = pkt["Acct-Status-Type"][0]
username = pkt["User-Name"][0]
session_id = pkt["Acct-Session-Id"][0]
user = RadiusUser.objects.get(user__username=username)
session, created = RadiusAccountingSession.objects.get_or_create(
id=session_id,
defaults={
user: user,
},
)
if status_type == "Start":
session.start()
elif status_type == "Stop":
session.stop()
RadiusAccountingEvent.objects.create(
session=session, event_type=status_type, raw_data=pkt.packet
)
acct.save()

3
radius/tests.py Normal file
View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

3
radius/views.py Normal file
View file

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.