Compare commits

..

No commits in common. "radius" and "main" have entirely different histories.
radius ... main

11 changed files with 29 additions and 374 deletions

View file

@ -4,101 +4,19 @@ from django.contrib.auth import get_user_model
from annoying.fields import AutoOneToOneField
from pyotp import TOTP, random_base32
from datetime import datetime
class TOTPSecret(models.Model):
"""A secret for a user's TOTP device
Attributes:
user: The user to whom this secret belongs
secret: The secret
active: Whether this secret is currently active
"""
user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True)
secret = models.CharField(max_length=32, default=random_base32)
active = models.BooleanField(default=False)
def verify(self, token: str, window: int = 1, set_used=True) -> bool:
"""Verify a TOTP token
def verify(self, token):
return TOTP(self.secret).verify(token)
Args:
token (str): The token to verify
window (int, optional): The number of tokens to check before and after the current token. Defaults to 1.
Returns:
bool: Whether the token was valid
"""
if not self.active:
return False
if UsedTOTPToken.is_used(self, token):
return False
if TOTP(self.secret).verify(token, valid_window=window):
if set_used:
UsedTOTPToken.objects.create(secret=self, token=token)
return True
return False
def get_uri(self) -> str:
"""Get the provisioning URI for this secret
This is used to generate a QR code for the user to scan with their TOTP device.
Returns:
str: The provisioning URI
"""
return TOTP(self.secret).provisioning_uri(
name=self.user.email, issuer_name="KumiDC"
)
def change_secret(self) -> str:
"""Change the secret for this user
This generates a new secret and saves it to the database.
Returns:
str: The new secret
"""
def get_uri(self):
return TOTP(self.secret).provisioning_uri(name=self.user.email, issuer_name="KumiDC")
def change_secret(self):
self.secret = random_base32()
self.save()
return self.secret
class UsedTOTPToken(models.Model):
"""A TOTP token that has been used
We log these so that we can prevent replay attacks.
Attributes:
secret: The secret that was used
token: The token that was used
timestamp: When the token was used
expiry: How long we should keep this token for
"""
secret = models.ForeignKey(TOTPSecret, on_delete=models.CASCADE)
token = models.CharField(max_length=6)
timestamp = models.DateTimeField(auto_now_add=True)
expiry = models.DateTimeField()
def save(self, *args, **kwargs):
self.expiry = self.timestamp + timedelta(seconds=60)
super().save(*args, **kwargs)
@classmethod
def is_used(cls, secret: TOTPSecret, token: str) -> bool:
"""Check whether a token has been used
Args:
secret (TOTPSecret): The secret that was used
token (str): The token that was used
Returns:
bool: Whether the token has recently been used
"""
return cls.objects.filter(secret=secret, token=token, expiry__gte=datetime.now()).exists()
return self.secret

View file

View file

@ -1,3 +0,0 @@
from django.contrib import admin
# Register your models here.

View file

@ -1,6 +0,0 @@
from django.apps import AppConfig
class RadiusConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "radius"

View file

@ -1,11 +0,0 @@
from radius.server import RadiusServer
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Run the RADIUS server"
def handle(self, *args, **kwargs):
# Set up and run the RADIUS server
srv = RadiusServer()
srv.Run()

View file

@ -1,81 +0,0 @@
from django.db import models
from django.contrib.auth import get_user_model
from datetime import datetime, timedelta
from cidrfield.models import IPNetworkField
class RadiusChallengeChoice(models.TextChoices):
NONE = "none", "None"
TOTP = "totp", "Time-based One-time Password"
WEB = "web", "Web-based"
class RadiusUser(models.Model):
user = models.OneToOneField(get_user_model(), on_delete=models.CASCADE)
challenge = models.CharField(
max_length=255, choices=RadiusChallengeChoice.choices, default=RadiusChallengeChoice.NONE
)
class RadiusNetwork(models.Model):
name = models.CharField(max_length=255)
network = IPNetworkField()
class RadiusUserIP(models.Model):
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
network = models.ForeignKey(RadiusNetwork, on_delete=models.CASCADE)
ipv4_address = models.GenericIPAddressField(protocol="ipv4", null=True, blank=True)
ipv6_prefix = models.GenericIPAddressField(protocol="ipv6", null=True, blank=True)
class Meta:
unique_together = ("user", "network")
def save(self, *args, **kwargs):
if not self.ip_address in self.network.network:
raise ValueError(
f"IPv4 address {self.ip_address} not in network {self.network.network}"
)
if not self.ipv6_prefix in self.network.network:
raise ValueError(
f"IPv6 prefix {self.ipv6_prefix} not in network {self.network.network}"
)
super().save(*args, **kwargs)
class RadiusAccountingSession(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
ip = models.ForeignKey(RadiusUserIP, on_delete=models.CASCADE)
start_time = models.DateTimeField(null=True, blank=True)
stop_time = models.DateTimeField(null=True, blank=True)
def start(self):
self.start_time = datetime.now()
self.save()
def stop(self):
self.stop_time = datetime.now()
self.save()
class RadiusAccountingEvent(models.Model):
session = models.ForeignKey(RadiusAccountingSession, on_delete=models.CASCADE)
event_type = models.CharField(max_length=255)
event_time = models.DateTimeField(auto_now_add=True)
raw_data = models.TextField()
class RadiusWebAuthentication(models.Model):
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
timestamp = models.DateTimeField(auto_now_add=True)
expiry = models.DateTimeField()
ip = models.IPAddressField()
def save(self, *args, **kwargs):
self.expiry = self.timestamp + timedelta(minutes=5)
super().save(*args, **kwargs)

View file

@ -1,157 +0,0 @@
from django.contrib.auth import authenticate, get_user_model
from pyrad import dictionary, packet, server
from radius.models import (
RadiusAccountingSession,
RadiusUser,
RadiusAccountingEvent,
RadiusWebAuthentication,
)
from authentication.helpers.otp import has_otp
from authentication.models import TOTPSecret
import uuid
class RadiusServer(server.Server):
def HandleAuthPacket(self, pkt):
"""Handle an authentication packet
Args:
pkt (packet.Packet): The packet to handle
"""
if state:
# Handle the challenge response
if self.validate_challenge_response(packet):
# Consider the challenge successfully passed
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessAccept
else:
# Challenge failed
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
return
# No state implies this may be the first packet for authentication
# So we need to generate a challenge if necessary
# First check if the user exists
user = authenticate(username=username, password=password)
if user is None:
# User does not exist or password is incorrect
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
return
if user.radiususer.challenge == "none":
# No challenge is required
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessAccept
self.SendReplyPacket(pkt.fd, reply)
return
# Generate a challenge
state = str(uuid.uuid4())
if user.radiususer.challenge == "totp":
if not has_otp(user):
# This user is not configured for TOTP...? # TODO: Make sure this is handled in the admin
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
return
challenge_reply = self.CreateReplyPacket(
pkt,
**{"State": state, "Reply-Message": "Please enter your TOTP token."}
)
challenge_reply.code = packet.AccessChallenge
self.SendReplyPacket(pkt.fd, challenge_reply)
return
if user.radiususer.challenge == "web":
challenge_reply = self.CreateReplyPacket(
pkt,
**{
"State": state,
"Reply-Message": "Please visit http://kumidc.local/auth to authenticate.", # TODO: Replace with a URL to our authentication page
}
)
challenge_reply.code = packet.AccessChallenge
self.SendReplyPacket(pkt.fd, challenge_reply)
return
# We should never get here. If we do, something has gone wrong.
reply = self.CreateReplyPacket(pkt)
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
def validate_challenge_response(self, packet, password):
state = packet["State"][0]
username = packet["User-Name"][0]
challenge_response = packet.get("User-Password", [None])[0]
user = get_user_model().objects.get(username=username)
if not user:
# This user... doesn't exist?
return False
if not user.radiususer.challenge:
# This user is not configured for challenge-response. Where did this response come from?
return False
if user.radiususer.challenge == "totp":
if not has_otp(user):
# This user is not configured for TOTP...
return False
totp = TOTPSecret.objects.get(user=user)
return totp.verify(
challenge_response
) # Returns True if the token is valid, False otherwise
if user.radiususer.challenge == "web":
ip = packet["NAS-IP-Address"][0]
auth = RadiusWebAuthentication.objects.filter(
user=user, ip=ip, expiry__gte=datetime.now()
)
auth.delete() # Delete any existing authentication sessions for this user
return True
def HandleAcctPacket(self, pkt: packet.Packet):
"""Handle an accounting packet
Args:
pkt (packet.Packet): The packet to handle
"""
status_type = pkt["Acct-Status-Type"][0]
username = pkt["User-Name"][0]
session_id = pkt["Acct-Session-Id"][0]
user = RadiusUser.objects.get(user__username=username)
session, created = RadiusAccountingSession.objects.get_or_create(
id=session_id,
defaults={
user: user,
},
)
if status_type == "Start":
session.start()
elif status_type == "Stop":
session.stop()
RadiusAccountingEvent.objects.create(
session=session, event_type=status_type, raw_data=pkt.packet
)
acct.save()

View file

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.

View file

@ -1,3 +0,0 @@
from django.shortcuts import render
# Create your views here.

View file

@ -1,25 +1,26 @@
Django # Duh.
Django
git+https://kumig.it/kumitterer/django-oidc-provider/ # OIDC Provider
git+https://kumig.it/kumitterer/django-cas-server/ # CAS Provider
git+https://github.com/OTA-Insight/djangosaml2idp/ # SAML2 Provider
git+https://kumig.it/kumitterer/django-oidc-provider/
git+https://kumig.it/kumitterer/django-cas-server/
git+https://github.com/OTA-Insight/djangosaml2idp/
dbsettings # For settings in the database
django-autosecretkey # For generating a secret key
dbsettings
django-autosecretkey
cryptography # For working with key pairs
pysaml2 # For interacting with SAML2
ldaptor # For interacting with LDAP
pyotp # For validating OTPs
django-timezone-field # For storing timezones
django-phonenumber-field[phonenumbers] # For storing phone numbers
django-annoying # AutoOneToOneField
django-crispy-forms # For pretty forms
crispy-bootstrap4 # Bootstrap4 theme for crispy forms
pyqrcode # For generating QR codes
pypng # Needed by pyqrcode for PNG generation
django-ajax-datatable # For pretty tables
pyjwt # Working with JWTs
pyrad # For everything RADIUS
django-cidrfield # For storing subnets
mysqlclient # For using MySQL as database
cryptography
pysaml2
ldaptor
pyotp
django-timezone-field
django-phonenumber-field[phonenumbers]
django-annoying
django-crispy-forms
crispy-bootstrap4
pyqrcode
pypng
django-ajax-datatable
pyjwt
# For MySQL:
mysqlclient