Compare commits


No commits in common. "radius" and "main" have entirely different histories.
radius ... main

11 changed files with 29 additions and 374 deletions

View file

@ -4,101 +4,19 @@ from django.contrib.auth import get_user_model
from annoying.fields import AutoOneToOneField from annoying.fields import AutoOneToOneField
from pyotp import TOTP, random_base32 from pyotp import TOTP, random_base32
from datetime import datetime
class TOTPSecret(models.Model): class TOTPSecret(models.Model):
"""A secret for a user's TOTP device
user: The user to whom this secret belongs
secret: The secret
active: Whether this secret is currently active
user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True) user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True)
secret = models.CharField(max_length=32, default=random_base32) secret = models.CharField(max_length=32, default=random_base32)
active = models.BooleanField(default=False) active = models.BooleanField(default=False)
def verify(self, token: str, window: int = 1, set_used=True) -> bool: def verify(self, token):
"""Verify a TOTP token return TOTP(self.secret).verify(token)
Args: def get_uri(self):
token (str): The token to verify return TOTP(self.secret).provisioning_uri(, issuer_name="KumiDC")
window (int, optional): The number of tokens to check before and after the current token. Defaults to 1.
bool: Whether the token was valid
if not
return False
if UsedTOTPToken.is_used(self, token):
return False
if TOTP(self.secret).verify(token, valid_window=window):
if set_used:
UsedTOTPToken.objects.create(secret=self, token=token)
return True
return False
def get_uri(self) -> str:
"""Get the provisioning URI for this secret
This is used to generate a QR code for the user to scan with their TOTP device.
str: The provisioning URI
return TOTP(self.secret).provisioning_uri(, issuer_name="KumiDC"
def change_secret(self) -> str:
"""Change the secret for this user
This generates a new secret and saves it to the database.
str: The new secret
def change_secret(self):
self.secret = random_base32() self.secret = random_base32()
return self.secret return self.secret
class UsedTOTPToken(models.Model):
"""A TOTP token that has been used
We log these so that we can prevent replay attacks.
secret: The secret that was used
token: The token that was used
timestamp: When the token was used
expiry: How long we should keep this token for
secret = models.ForeignKey(TOTPSecret, on_delete=models.CASCADE)
token = models.CharField(max_length=6)
timestamp = models.DateTimeField(auto_now_add=True)
expiry = models.DateTimeField()
def save(self, *args, **kwargs):
self.expiry = self.timestamp + timedelta(seconds=60)
super().save(*args, **kwargs)
def is_used(cls, secret: TOTPSecret, token: str) -> bool:
"""Check whether a token has been used
secret (TOTPSecret): The secret that was used
token (str): The token that was used
bool: Whether the token has recently been used
return cls.objects.filter(secret=secret, token=token,

View file

View file

@ -1,3 +0,0 @@
from django.contrib import admin
# Register your models here.

View file

@ -1,6 +0,0 @@
from django.apps import AppConfig
class RadiusConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "radius"

View file

@ -1,11 +0,0 @@
from radius.server import RadiusServer
from import BaseCommand
class Command(BaseCommand):
help = "Run the RADIUS server"
def handle(self, *args, **kwargs):
# Set up and run the RADIUS server
srv = RadiusServer()

View file

@ -1,81 +0,0 @@
from django.db import models
from django.contrib.auth import get_user_model
from datetime import datetime, timedelta
from cidrfield.models import IPNetworkField
class RadiusChallengeChoice(models.TextChoices):
NONE = "none", "None"
TOTP = "totp", "Time-based One-time Password"
WEB = "web", "Web-based"
class RadiusUser(models.Model):
user = models.OneToOneField(get_user_model(), on_delete=models.CASCADE)
challenge = models.CharField(
max_length=255, choices=RadiusChallengeChoice.choices, default=RadiusChallengeChoice.NONE
class RadiusNetwork(models.Model):
name = models.CharField(max_length=255)
network = IPNetworkField()
class RadiusUserIP(models.Model):
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
network = models.ForeignKey(RadiusNetwork, on_delete=models.CASCADE)
ipv4_address = models.GenericIPAddressField(protocol="ipv4", null=True, blank=True)
ipv6_prefix = models.GenericIPAddressField(protocol="ipv6", null=True, blank=True)
class Meta:
unique_together = ("user", "network")
def save(self, *args, **kwargs):
if not self.ip_address in
raise ValueError(
f"IPv4 address {self.ip_address} not in network {}"
if not self.ipv6_prefix in
raise ValueError(
f"IPv6 prefix {self.ipv6_prefix} not in network {}"
super().save(*args, **kwargs)
class RadiusAccountingSession(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
ip = models.ForeignKey(RadiusUserIP, on_delete=models.CASCADE)
start_time = models.DateTimeField(null=True, blank=True)
stop_time = models.DateTimeField(null=True, blank=True)
def start(self):
self.start_time =
def stop(self):
self.stop_time =
class RadiusAccountingEvent(models.Model):
session = models.ForeignKey(RadiusAccountingSession, on_delete=models.CASCADE)
event_type = models.CharField(max_length=255)
event_time = models.DateTimeField(auto_now_add=True)
raw_data = models.TextField()
class RadiusWebAuthentication(models.Model):
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
timestamp = models.DateTimeField(auto_now_add=True)
expiry = models.DateTimeField()
ip = models.IPAddressField()
def save(self, *args, **kwargs):
self.expiry = self.timestamp + timedelta(minutes=5)
super().save(*args, **kwargs)

View file

@ -1,157 +0,0 @@
from django.contrib.auth import authenticate, get_user_model
from pyrad import dictionary, packet, server
from radius.models import (
from authentication.helpers.otp import has_otp
from authentication.models import TOTPSecret
import uuid
class RadiusServer(server.Server):
def HandleAuthPacket(self, pkt):
"""Handle an authentication packet
pkt (packet.Packet): The packet to handle
if state:
# Handle the challenge response
if self.validate_challenge_response(packet):
# Consider the challenge successfully passed
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessAccept
# Challenge failed
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
# No state implies this may be the first packet for authentication
# So we need to generate a challenge if necessary
# First check if the user exists
user = authenticate(username=username, password=password)
if user is None:
# User does not exist or password is incorrect
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
if user.radiususer.challenge == "none":
# No challenge is required
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessAccept
self.SendReplyPacket(pkt.fd, reply)
# Generate a challenge
state = str(uuid.uuid4())
if user.radiususer.challenge == "totp":
if not has_otp(user):
# This user is not configured for TOTP...? # TODO: Make sure this is handled in the admin
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
challenge_reply = self.CreateReplyPacket(
**{"State": state, "Reply-Message": "Please enter your TOTP token."}
challenge_reply.code = packet.AccessChallenge
self.SendReplyPacket(pkt.fd, challenge_reply)
if user.radiususer.challenge == "web":
challenge_reply = self.CreateReplyPacket(
"State": state,
"Reply-Message": "Please visit http://kumidc.local/auth to authenticate.", # TODO: Replace with a URL to our authentication page
challenge_reply.code = packet.AccessChallenge
self.SendReplyPacket(pkt.fd, challenge_reply)
# We should never get here. If we do, something has gone wrong.
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
def validate_challenge_response(self, packet, password):
state = packet["State"][0]
username = packet["User-Name"][0]
challenge_response = packet.get("User-Password", [None])[0]
user = get_user_model().objects.get(username=username)
if not user:
# This user... doesn't exist?
return False
if not user.radiususer.challenge:
# This user is not configured for challenge-response. Where did this response come from?
return False
if user.radiususer.challenge == "totp":
if not has_otp(user):
# This user is not configured for TOTP...
return False
totp = TOTPSecret.objects.get(user=user)
return totp.verify(
) # Returns True if the token is valid, False otherwise
if user.radiususer.challenge == "web":
ip = packet["NAS-IP-Address"][0]
auth = RadiusWebAuthentication.objects.filter(
user=user, ip=ip,
auth.delete() # Delete any existing authentication sessions for this user
return True
def HandleAcctPacket(self, pkt: packet.Packet):
"""Handle an accounting packet
pkt (packet.Packet): The packet to handle
status_type = pkt["Acct-Status-Type"][0]
username = pkt["User-Name"][0]
session_id = pkt["Acct-Session-Id"][0]
user = RadiusUser.objects.get(user__username=username)
session, created = RadiusAccountingSession.objects.get_or_create(
user: user,
if status_type == "Start":
elif status_type == "Stop":
session=session, event_type=status_type, raw_data=pkt.packet

View file

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.

View file

@ -1,3 +0,0 @@
from django.shortcuts import render
# Create your views here.

View file

@ -1,25 +1,26 @@
Django # Duh. Django
git+ # OIDC Provider git+
git+ # CAS Provider git+
git+ # SAML2 Provider git+
dbsettings # For settings in the database dbsettings
django-autosecretkey # For generating a secret key django-autosecretkey
cryptography # For working with key pairs cryptography
pysaml2 # For interacting with SAML2 pysaml2
ldaptor # For interacting with LDAP ldaptor
pyotp # For validating OTPs pyotp
django-timezone-field # For storing timezones django-timezone-field
django-phonenumber-field[phonenumbers] # For storing phone numbers django-phonenumber-field[phonenumbers]
django-annoying # AutoOneToOneField django-annoying
django-crispy-forms # For pretty forms django-crispy-forms
crispy-bootstrap4 # Bootstrap4 theme for crispy forms crispy-bootstrap4
pyqrcode # For generating QR codes pyqrcode
pypng # Needed by pyqrcode for PNG generation pypng
django-ajax-datatable # For pretty tables django-ajax-datatable
pyjwt # Working with JWTs pyjwt
pyrad # For everything RADIUS
django-cidrfield # For storing subnets # For MySQL:
mysqlclient # For using MySQL as database