Compare commits
No commits in common. "radius" and "main" have entirely different histories.
11 changed files with 29 additions and 374 deletions
|
@ -4,101 +4,19 @@ from django.contrib.auth import get_user_model
|
|||
from annoying.fields import AutoOneToOneField
|
||||
from pyotp import TOTP, random_base32
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class TOTPSecret(models.Model):
|
||||
"""A secret for a user's TOTP device
|
||||
|
||||
Attributes:
|
||||
user: The user to whom this secret belongs
|
||||
secret: The secret
|
||||
active: Whether this secret is currently active
|
||||
"""
|
||||
|
||||
user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True)
|
||||
secret = models.CharField(max_length=32, default=random_base32)
|
||||
active = models.BooleanField(default=False)
|
||||
|
||||
def verify(self, token: str, window: int = 1, set_used=True) -> bool:
|
||||
"""Verify a TOTP token
|
||||
def verify(self, token):
|
||||
return TOTP(self.secret).verify(token)
|
||||
|
||||
Args:
|
||||
token (str): The token to verify
|
||||
window (int, optional): The number of tokens to check before and after the current token. Defaults to 1.
|
||||
|
||||
Returns:
|
||||
bool: Whether the token was valid
|
||||
"""
|
||||
|
||||
if not self.active:
|
||||
return False
|
||||
|
||||
if UsedTOTPToken.is_used(self, token):
|
||||
return False
|
||||
|
||||
if TOTP(self.secret).verify(token, valid_window=window):
|
||||
if set_used:
|
||||
UsedTOTPToken.objects.create(secret=self, token=token)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_uri(self) -> str:
|
||||
"""Get the provisioning URI for this secret
|
||||
|
||||
This is used to generate a QR code for the user to scan with their TOTP device.
|
||||
|
||||
Returns:
|
||||
str: The provisioning URI
|
||||
"""
|
||||
return TOTP(self.secret).provisioning_uri(
|
||||
name=self.user.email, issuer_name="KumiDC"
|
||||
)
|
||||
|
||||
def change_secret(self) -> str:
|
||||
"""Change the secret for this user
|
||||
|
||||
This generates a new secret and saves it to the database.
|
||||
|
||||
Returns:
|
||||
str: The new secret
|
||||
"""
|
||||
def get_uri(self):
|
||||
return TOTP(self.secret).provisioning_uri(name=self.user.email, issuer_name="KumiDC")
|
||||
|
||||
def change_secret(self):
|
||||
self.secret = random_base32()
|
||||
self.save()
|
||||
return self.secret
|
||||
|
||||
|
||||
class UsedTOTPToken(models.Model):
|
||||
"""A TOTP token that has been used
|
||||
|
||||
We log these so that we can prevent replay attacks.
|
||||
|
||||
Attributes:
|
||||
secret: The secret that was used
|
||||
token: The token that was used
|
||||
timestamp: When the token was used
|
||||
expiry: How long we should keep this token for
|
||||
"""
|
||||
secret = models.ForeignKey(TOTPSecret, on_delete=models.CASCADE)
|
||||
token = models.CharField(max_length=6)
|
||||
timestamp = models.DateTimeField(auto_now_add=True)
|
||||
expiry = models.DateTimeField()
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self.expiry = self.timestamp + timedelta(seconds=60)
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def is_used(cls, secret: TOTPSecret, token: str) -> bool:
|
||||
"""Check whether a token has been used
|
||||
|
||||
Args:
|
||||
secret (TOTPSecret): The secret that was used
|
||||
token (str): The token that was used
|
||||
|
||||
Returns:
|
||||
bool: Whether the token has recently been used
|
||||
"""
|
||||
return cls.objects.filter(secret=secret, token=token, expiry__gte=datetime.now()).exists()
|
|
@ -1,3 +0,0 @@
|
|||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
|
@ -1,6 +0,0 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class RadiusConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "radius"
|
|
@ -1,11 +0,0 @@
|
|||
from radius.server import RadiusServer
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Run the RADIUS server"
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
# Set up and run the RADIUS server
|
||||
srv = RadiusServer()
|
||||
srv.Run()
|
|
@ -1,81 +0,0 @@
|
|||
from django.db import models
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from cidrfield.models import IPNetworkField
|
||||
|
||||
|
||||
class RadiusChallengeChoice(models.TextChoices):
|
||||
NONE = "none", "None"
|
||||
TOTP = "totp", "Time-based One-time Password"
|
||||
WEB = "web", "Web-based"
|
||||
|
||||
class RadiusUser(models.Model):
|
||||
user = models.OneToOneField(get_user_model(), on_delete=models.CASCADE)
|
||||
challenge = models.CharField(
|
||||
max_length=255, choices=RadiusChallengeChoice.choices, default=RadiusChallengeChoice.NONE
|
||||
)
|
||||
|
||||
class RadiusNetwork(models.Model):
|
||||
name = models.CharField(max_length=255)
|
||||
network = IPNetworkField()
|
||||
|
||||
|
||||
class RadiusUserIP(models.Model):
|
||||
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
|
||||
network = models.ForeignKey(RadiusNetwork, on_delete=models.CASCADE)
|
||||
ipv4_address = models.GenericIPAddressField(protocol="ipv4", null=True, blank=True)
|
||||
ipv6_prefix = models.GenericIPAddressField(protocol="ipv6", null=True, blank=True)
|
||||
|
||||
class Meta:
|
||||
unique_together = ("user", "network")
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
if not self.ip_address in self.network.network:
|
||||
raise ValueError(
|
||||
f"IPv4 address {self.ip_address} not in network {self.network.network}"
|
||||
)
|
||||
if not self.ipv6_prefix in self.network.network:
|
||||
raise ValueError(
|
||||
f"IPv6 prefix {self.ipv6_prefix} not in network {self.network.network}"
|
||||
)
|
||||
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
|
||||
class RadiusAccountingSession(models.Model):
|
||||
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
||||
|
||||
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
|
||||
ip = models.ForeignKey(RadiusUserIP, on_delete=models.CASCADE)
|
||||
|
||||
start_time = models.DateTimeField(null=True, blank=True)
|
||||
stop_time = models.DateTimeField(null=True, blank=True)
|
||||
|
||||
def start(self):
|
||||
self.start_time = datetime.now()
|
||||
self.save()
|
||||
|
||||
def stop(self):
|
||||
self.stop_time = datetime.now()
|
||||
self.save()
|
||||
|
||||
|
||||
class RadiusAccountingEvent(models.Model):
|
||||
session = models.ForeignKey(RadiusAccountingSession, on_delete=models.CASCADE)
|
||||
|
||||
event_type = models.CharField(max_length=255)
|
||||
event_time = models.DateTimeField(auto_now_add=True)
|
||||
raw_data = models.TextField()
|
||||
|
||||
|
||||
class RadiusWebAuthentication(models.Model):
|
||||
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
|
||||
timestamp = models.DateTimeField(auto_now_add=True)
|
||||
expiry = models.DateTimeField()
|
||||
ip = models.IPAddressField()
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self.expiry = self.timestamp + timedelta(minutes=5)
|
||||
super().save(*args, **kwargs)
|
157
radius/server.py
157
radius/server.py
|
@ -1,157 +0,0 @@
|
|||
from django.contrib.auth import authenticate, get_user_model
|
||||
|
||||
from pyrad import dictionary, packet, server
|
||||
|
||||
from radius.models import (
|
||||
RadiusAccountingSession,
|
||||
RadiusUser,
|
||||
RadiusAccountingEvent,
|
||||
RadiusWebAuthentication,
|
||||
)
|
||||
from authentication.helpers.otp import has_otp
|
||||
from authentication.models import TOTPSecret
|
||||
|
||||
import uuid
|
||||
|
||||
|
||||
class RadiusServer(server.Server):
|
||||
def HandleAuthPacket(self, pkt):
|
||||
"""Handle an authentication packet
|
||||
|
||||
Args:
|
||||
pkt (packet.Packet): The packet to handle
|
||||
"""
|
||||
if state:
|
||||
# Handle the challenge response
|
||||
if self.validate_challenge_response(packet):
|
||||
# Consider the challenge successfully passed
|
||||
reply = self.CreateReplyPacket(pkt)
|
||||
reply.code = packet.AccessAccept
|
||||
else:
|
||||
# Challenge failed
|
||||
reply = self.CreateReplyPacket(pkt)
|
||||
reply.code = packet.AccessReject
|
||||
self.SendReplyPacket(pkt.fd, reply)
|
||||
return
|
||||
|
||||
# No state implies this may be the first packet for authentication
|
||||
# So we need to generate a challenge if necessary
|
||||
|
||||
# First check if the user exists
|
||||
user = authenticate(username=username, password=password)
|
||||
|
||||
if user is None:
|
||||
# User does not exist or password is incorrect
|
||||
reply = self.CreateReplyPacket(pkt)
|
||||
reply.code = packet.AccessReject
|
||||
self.SendReplyPacket(pkt.fd, reply)
|
||||
return
|
||||
|
||||
if user.radiususer.challenge == "none":
|
||||
# No challenge is required
|
||||
reply = self.CreateReplyPacket(pkt)
|
||||
reply.code = packet.AccessAccept
|
||||
self.SendReplyPacket(pkt.fd, reply)
|
||||
return
|
||||
|
||||
# Generate a challenge
|
||||
|
||||
state = str(uuid.uuid4())
|
||||
|
||||
if user.radiususer.challenge == "totp":
|
||||
if not has_otp(user):
|
||||
# This user is not configured for TOTP...? # TODO: Make sure this is handled in the admin
|
||||
reply = self.CreateReplyPacket(pkt)
|
||||
reply.code = packet.AccessReject
|
||||
self.SendReplyPacket(pkt.fd, reply)
|
||||
return
|
||||
|
||||
challenge_reply = self.CreateReplyPacket(
|
||||
pkt,
|
||||
**{"State": state, "Reply-Message": "Please enter your TOTP token."}
|
||||
)
|
||||
|
||||
challenge_reply.code = packet.AccessChallenge
|
||||
self.SendReplyPacket(pkt.fd, challenge_reply)
|
||||
return
|
||||
|
||||
if user.radiususer.challenge == "web":
|
||||
challenge_reply = self.CreateReplyPacket(
|
||||
pkt,
|
||||
**{
|
||||
"State": state,
|
||||
"Reply-Message": "Please visit http://kumidc.local/auth to authenticate.", # TODO: Replace with a URL to our authentication page
|
||||
}
|
||||
)
|
||||
challenge_reply.code = packet.AccessChallenge
|
||||
self.SendReplyPacket(pkt.fd, challenge_reply)
|
||||
return
|
||||
|
||||
# We should never get here. If we do, something has gone wrong.
|
||||
reply = self.CreateReplyPacket(pkt)
|
||||
reply.code = packet.AccessReject
|
||||
self.SendReplyPacket(pkt.fd, reply)
|
||||
|
||||
def validate_challenge_response(self, packet, password):
|
||||
state = packet["State"][0]
|
||||
username = packet["User-Name"][0]
|
||||
challenge_response = packet.get("User-Password", [None])[0]
|
||||
|
||||
user = get_user_model().objects.get(username=username)
|
||||
|
||||
if not user:
|
||||
# This user... doesn't exist?
|
||||
return False
|
||||
|
||||
if not user.radiususer.challenge:
|
||||
# This user is not configured for challenge-response. Where did this response come from?
|
||||
return False
|
||||
|
||||
if user.radiususer.challenge == "totp":
|
||||
if not has_otp(user):
|
||||
# This user is not configured for TOTP...
|
||||
return False
|
||||
|
||||
totp = TOTPSecret.objects.get(user=user)
|
||||
|
||||
return totp.verify(
|
||||
challenge_response
|
||||
) # Returns True if the token is valid, False otherwise
|
||||
|
||||
if user.radiususer.challenge == "web":
|
||||
ip = packet["NAS-IP-Address"][0]
|
||||
auth = RadiusWebAuthentication.objects.filter(
|
||||
user=user, ip=ip, expiry__gte=datetime.now()
|
||||
)
|
||||
auth.delete() # Delete any existing authentication sessions for this user
|
||||
return True
|
||||
|
||||
def HandleAcctPacket(self, pkt: packet.Packet):
|
||||
"""Handle an accounting packet
|
||||
|
||||
Args:
|
||||
pkt (packet.Packet): The packet to handle
|
||||
"""
|
||||
status_type = pkt["Acct-Status-Type"][0]
|
||||
username = pkt["User-Name"][0]
|
||||
session_id = pkt["Acct-Session-Id"][0]
|
||||
|
||||
user = RadiusUser.objects.get(user__username=username)
|
||||
|
||||
session, created = RadiusAccountingSession.objects.get_or_create(
|
||||
id=session_id,
|
||||
defaults={
|
||||
user: user,
|
||||
},
|
||||
)
|
||||
|
||||
if status_type == "Start":
|
||||
session.start()
|
||||
elif status_type == "Stop":
|
||||
session.stop()
|
||||
|
||||
RadiusAccountingEvent.objects.create(
|
||||
session=session, event_type=status_type, raw_data=pkt.packet
|
||||
)
|
||||
|
||||
acct.save()
|
|
@ -1,3 +0,0 @@
|
|||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
|
@ -1,3 +0,0 @@
|
|||
from django.shortcuts import render
|
||||
|
||||
# Create your views here.
|
|
@ -1,25 +1,26 @@
|
|||
Django # Duh.
|
||||
Django
|
||||
|
||||
git+https://kumig.it/kumitterer/django-oidc-provider/ # OIDC Provider
|
||||
git+https://kumig.it/kumitterer/django-cas-server/ # CAS Provider
|
||||
git+https://github.com/OTA-Insight/djangosaml2idp/ # SAML2 Provider
|
||||
git+https://kumig.it/kumitterer/django-oidc-provider/
|
||||
git+https://kumig.it/kumitterer/django-cas-server/
|
||||
git+https://github.com/OTA-Insight/djangosaml2idp/
|
||||
|
||||
dbsettings # For settings in the database
|
||||
django-autosecretkey # For generating a secret key
|
||||
dbsettings
|
||||
django-autosecretkey
|
||||
|
||||
cryptography # For working with key pairs
|
||||
pysaml2 # For interacting with SAML2
|
||||
ldaptor # For interacting with LDAP
|
||||
pyotp # For validating OTPs
|
||||
django-timezone-field # For storing timezones
|
||||
django-phonenumber-field[phonenumbers] # For storing phone numbers
|
||||
django-annoying # AutoOneToOneField
|
||||
django-crispy-forms # For pretty forms
|
||||
crispy-bootstrap4 # Bootstrap4 theme for crispy forms
|
||||
pyqrcode # For generating QR codes
|
||||
pypng # Needed by pyqrcode for PNG generation
|
||||
django-ajax-datatable # For pretty tables
|
||||
pyjwt # Working with JWTs
|
||||
pyrad # For everything RADIUS
|
||||
django-cidrfield # For storing subnets
|
||||
mysqlclient # For using MySQL as database
|
||||
cryptography
|
||||
pysaml2
|
||||
ldaptor
|
||||
pyotp
|
||||
django-timezone-field
|
||||
django-phonenumber-field[phonenumbers]
|
||||
django-annoying
|
||||
django-crispy-forms
|
||||
crispy-bootstrap4
|
||||
pyqrcode
|
||||
pypng
|
||||
django-ajax-datatable
|
||||
pyjwt
|
||||
|
||||
# For MySQL:
|
||||
|
||||
mysqlclient
|
||||
|
|
Loading…
Reference in a new issue