Compare commits
No commits in common. "radius" and "main" have entirely different histories.
11 changed files with 29 additions and 374 deletions
|
@ -4,101 +4,19 @@ from django.contrib.auth import get_user_model
|
||||||
from annoying.fields import AutoOneToOneField
|
from annoying.fields import AutoOneToOneField
|
||||||
from pyotp import TOTP, random_base32
|
from pyotp import TOTP, random_base32
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
class TOTPSecret(models.Model):
|
class TOTPSecret(models.Model):
|
||||||
"""A secret for a user's TOTP device
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
user: The user to whom this secret belongs
|
|
||||||
secret: The secret
|
|
||||||
active: Whether this secret is currently active
|
|
||||||
"""
|
|
||||||
|
|
||||||
user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True)
|
user = AutoOneToOneField(get_user_model(), models.CASCADE, primary_key=True)
|
||||||
secret = models.CharField(max_length=32, default=random_base32)
|
secret = models.CharField(max_length=32, default=random_base32)
|
||||||
active = models.BooleanField(default=False)
|
active = models.BooleanField(default=False)
|
||||||
|
|
||||||
def verify(self, token: str, window: int = 1, set_used=True) -> bool:
|
def verify(self, token):
|
||||||
"""Verify a TOTP token
|
return TOTP(self.secret).verify(token)
|
||||||
|
|
||||||
Args:
|
def get_uri(self):
|
||||||
token (str): The token to verify
|
return TOTP(self.secret).provisioning_uri(name=self.user.email, issuer_name="KumiDC")
|
||||||
window (int, optional): The number of tokens to check before and after the current token. Defaults to 1.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: Whether the token was valid
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not self.active:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if UsedTOTPToken.is_used(self, token):
|
|
||||||
return False
|
|
||||||
|
|
||||||
if TOTP(self.secret).verify(token, valid_window=window):
|
|
||||||
if set_used:
|
|
||||||
UsedTOTPToken.objects.create(secret=self, token=token)
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_uri(self) -> str:
|
|
||||||
"""Get the provisioning URI for this secret
|
|
||||||
|
|
||||||
This is used to generate a QR code for the user to scan with their TOTP device.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The provisioning URI
|
|
||||||
"""
|
|
||||||
return TOTP(self.secret).provisioning_uri(
|
|
||||||
name=self.user.email, issuer_name="KumiDC"
|
|
||||||
)
|
|
||||||
|
|
||||||
def change_secret(self) -> str:
|
|
||||||
"""Change the secret for this user
|
|
||||||
|
|
||||||
This generates a new secret and saves it to the database.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The new secret
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
def change_secret(self):
|
||||||
self.secret = random_base32()
|
self.secret = random_base32()
|
||||||
self.save()
|
self.save()
|
||||||
return self.secret
|
return self.secret
|
||||||
|
|
||||||
|
|
||||||
class UsedTOTPToken(models.Model):
|
|
||||||
"""A TOTP token that has been used
|
|
||||||
|
|
||||||
We log these so that we can prevent replay attacks.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
secret: The secret that was used
|
|
||||||
token: The token that was used
|
|
||||||
timestamp: When the token was used
|
|
||||||
expiry: How long we should keep this token for
|
|
||||||
"""
|
|
||||||
secret = models.ForeignKey(TOTPSecret, on_delete=models.CASCADE)
|
|
||||||
token = models.CharField(max_length=6)
|
|
||||||
timestamp = models.DateTimeField(auto_now_add=True)
|
|
||||||
expiry = models.DateTimeField()
|
|
||||||
|
|
||||||
def save(self, *args, **kwargs):
|
|
||||||
self.expiry = self.timestamp + timedelta(seconds=60)
|
|
||||||
super().save(*args, **kwargs)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def is_used(cls, secret: TOTPSecret, token: str) -> bool:
|
|
||||||
"""Check whether a token has been used
|
|
||||||
|
|
||||||
Args:
|
|
||||||
secret (TOTPSecret): The secret that was used
|
|
||||||
token (str): The token that was used
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: Whether the token has recently been used
|
|
||||||
"""
|
|
||||||
return cls.objects.filter(secret=secret, token=token, expiry__gte=datetime.now()).exists()
|
|
|
@ -1,3 +0,0 @@
|
||||||
from django.contrib import admin
|
|
||||||
|
|
||||||
# Register your models here.
|
|
|
@ -1,6 +0,0 @@
|
||||||
from django.apps import AppConfig
|
|
||||||
|
|
||||||
|
|
||||||
class RadiusConfig(AppConfig):
|
|
||||||
default_auto_field = "django.db.models.BigAutoField"
|
|
||||||
name = "radius"
|
|
|
@ -1,11 +0,0 @@
|
||||||
from radius.server import RadiusServer
|
|
||||||
|
|
||||||
from django.core.management.base import BaseCommand
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
|
||||||
help = "Run the RADIUS server"
|
|
||||||
|
|
||||||
def handle(self, *args, **kwargs):
|
|
||||||
# Set up and run the RADIUS server
|
|
||||||
srv = RadiusServer()
|
|
||||||
srv.Run()
|
|
|
@ -1,81 +0,0 @@
|
||||||
from django.db import models
|
|
||||||
from django.contrib.auth import get_user_model
|
|
||||||
|
|
||||||
from datetime import datetime, timedelta
|
|
||||||
|
|
||||||
from cidrfield.models import IPNetworkField
|
|
||||||
|
|
||||||
|
|
||||||
class RadiusChallengeChoice(models.TextChoices):
|
|
||||||
NONE = "none", "None"
|
|
||||||
TOTP = "totp", "Time-based One-time Password"
|
|
||||||
WEB = "web", "Web-based"
|
|
||||||
|
|
||||||
class RadiusUser(models.Model):
|
|
||||||
user = models.OneToOneField(get_user_model(), on_delete=models.CASCADE)
|
|
||||||
challenge = models.CharField(
|
|
||||||
max_length=255, choices=RadiusChallengeChoice.choices, default=RadiusChallengeChoice.NONE
|
|
||||||
)
|
|
||||||
|
|
||||||
class RadiusNetwork(models.Model):
|
|
||||||
name = models.CharField(max_length=255)
|
|
||||||
network = IPNetworkField()
|
|
||||||
|
|
||||||
|
|
||||||
class RadiusUserIP(models.Model):
|
|
||||||
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
|
|
||||||
network = models.ForeignKey(RadiusNetwork, on_delete=models.CASCADE)
|
|
||||||
ipv4_address = models.GenericIPAddressField(protocol="ipv4", null=True, blank=True)
|
|
||||||
ipv6_prefix = models.GenericIPAddressField(protocol="ipv6", null=True, blank=True)
|
|
||||||
|
|
||||||
class Meta:
|
|
||||||
unique_together = ("user", "network")
|
|
||||||
|
|
||||||
def save(self, *args, **kwargs):
|
|
||||||
if not self.ip_address in self.network.network:
|
|
||||||
raise ValueError(
|
|
||||||
f"IPv4 address {self.ip_address} not in network {self.network.network}"
|
|
||||||
)
|
|
||||||
if not self.ipv6_prefix in self.network.network:
|
|
||||||
raise ValueError(
|
|
||||||
f"IPv6 prefix {self.ipv6_prefix} not in network {self.network.network}"
|
|
||||||
)
|
|
||||||
|
|
||||||
super().save(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class RadiusAccountingSession(models.Model):
|
|
||||||
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
|
|
||||||
|
|
||||||
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
|
|
||||||
ip = models.ForeignKey(RadiusUserIP, on_delete=models.CASCADE)
|
|
||||||
|
|
||||||
start_time = models.DateTimeField(null=True, blank=True)
|
|
||||||
stop_time = models.DateTimeField(null=True, blank=True)
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self.start_time = datetime.now()
|
|
||||||
self.save()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.stop_time = datetime.now()
|
|
||||||
self.save()
|
|
||||||
|
|
||||||
|
|
||||||
class RadiusAccountingEvent(models.Model):
|
|
||||||
session = models.ForeignKey(RadiusAccountingSession, on_delete=models.CASCADE)
|
|
||||||
|
|
||||||
event_type = models.CharField(max_length=255)
|
|
||||||
event_time = models.DateTimeField(auto_now_add=True)
|
|
||||||
raw_data = models.TextField()
|
|
||||||
|
|
||||||
|
|
||||||
class RadiusWebAuthentication(models.Model):
|
|
||||||
user = models.ForeignKey(RadiusUser, on_delete=models.CASCADE)
|
|
||||||
timestamp = models.DateTimeField(auto_now_add=True)
|
|
||||||
expiry = models.DateTimeField()
|
|
||||||
ip = models.IPAddressField()
|
|
||||||
|
|
||||||
def save(self, *args, **kwargs):
|
|
||||||
self.expiry = self.timestamp + timedelta(minutes=5)
|
|
||||||
super().save(*args, **kwargs)
|
|
157
radius/server.py
157
radius/server.py
|
@ -1,157 +0,0 @@
|
||||||
from django.contrib.auth import authenticate, get_user_model
|
|
||||||
|
|
||||||
from pyrad import dictionary, packet, server
|
|
||||||
|
|
||||||
from radius.models import (
|
|
||||||
RadiusAccountingSession,
|
|
||||||
RadiusUser,
|
|
||||||
RadiusAccountingEvent,
|
|
||||||
RadiusWebAuthentication,
|
|
||||||
)
|
|
||||||
from authentication.helpers.otp import has_otp
|
|
||||||
from authentication.models import TOTPSecret
|
|
||||||
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
|
|
||||||
class RadiusServer(server.Server):
|
|
||||||
def HandleAuthPacket(self, pkt):
|
|
||||||
"""Handle an authentication packet
|
|
||||||
|
|
||||||
Args:
|
|
||||||
pkt (packet.Packet): The packet to handle
|
|
||||||
"""
|
|
||||||
if state:
|
|
||||||
# Handle the challenge response
|
|
||||||
if self.validate_challenge_response(packet):
|
|
||||||
# Consider the challenge successfully passed
|
|
||||||
reply = self.CreateReplyPacket(pkt)
|
|
||||||
reply.code = packet.AccessAccept
|
|
||||||
else:
|
|
||||||
# Challenge failed
|
|
||||||
reply = self.CreateReplyPacket(pkt)
|
|
||||||
reply.code = packet.AccessReject
|
|
||||||
self.SendReplyPacket(pkt.fd, reply)
|
|
||||||
return
|
|
||||||
|
|
||||||
# No state implies this may be the first packet for authentication
|
|
||||||
# So we need to generate a challenge if necessary
|
|
||||||
|
|
||||||
# First check if the user exists
|
|
||||||
user = authenticate(username=username, password=password)
|
|
||||||
|
|
||||||
if user is None:
|
|
||||||
# User does not exist or password is incorrect
|
|
||||||
reply = self.CreateReplyPacket(pkt)
|
|
||||||
reply.code = packet.AccessReject
|
|
||||||
self.SendReplyPacket(pkt.fd, reply)
|
|
||||||
return
|
|
||||||
|
|
||||||
if user.radiususer.challenge == "none":
|
|
||||||
# No challenge is required
|
|
||||||
reply = self.CreateReplyPacket(pkt)
|
|
||||||
reply.code = packet.AccessAccept
|
|
||||||
self.SendReplyPacket(pkt.fd, reply)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Generate a challenge
|
|
||||||
|
|
||||||
state = str(uuid.uuid4())
|
|
||||||
|
|
||||||
if user.radiususer.challenge == "totp":
|
|
||||||
if not has_otp(user):
|
|
||||||
# This user is not configured for TOTP...? # TODO: Make sure this is handled in the admin
|
|
||||||
reply = self.CreateReplyPacket(pkt)
|
|
||||||
reply.code = packet.AccessReject
|
|
||||||
self.SendReplyPacket(pkt.fd, reply)
|
|
||||||
return
|
|
||||||
|
|
||||||
challenge_reply = self.CreateReplyPacket(
|
|
||||||
pkt,
|
|
||||||
**{"State": state, "Reply-Message": "Please enter your TOTP token."}
|
|
||||||
)
|
|
||||||
|
|
||||||
challenge_reply.code = packet.AccessChallenge
|
|
||||||
self.SendReplyPacket(pkt.fd, challenge_reply)
|
|
||||||
return
|
|
||||||
|
|
||||||
if user.radiususer.challenge == "web":
|
|
||||||
challenge_reply = self.CreateReplyPacket(
|
|
||||||
pkt,
|
|
||||||
**{
|
|
||||||
"State": state,
|
|
||||||
"Reply-Message": "Please visit http://kumidc.local/auth to authenticate.", # TODO: Replace with a URL to our authentication page
|
|
||||||
}
|
|
||||||
)
|
|
||||||
challenge_reply.code = packet.AccessChallenge
|
|
||||||
self.SendReplyPacket(pkt.fd, challenge_reply)
|
|
||||||
return
|
|
||||||
|
|
||||||
# We should never get here. If we do, something has gone wrong.
|
|
||||||
reply = self.CreateReplyPacket(pkt)
|
|
||||||
reply.code = packet.AccessReject
|
|
||||||
self.SendReplyPacket(pkt.fd, reply)
|
|
||||||
|
|
||||||
def validate_challenge_response(self, packet, password):
|
|
||||||
state = packet["State"][0]
|
|
||||||
username = packet["User-Name"][0]
|
|
||||||
challenge_response = packet.get("User-Password", [None])[0]
|
|
||||||
|
|
||||||
user = get_user_model().objects.get(username=username)
|
|
||||||
|
|
||||||
if not user:
|
|
||||||
# This user... doesn't exist?
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not user.radiususer.challenge:
|
|
||||||
# This user is not configured for challenge-response. Where did this response come from?
|
|
||||||
return False
|
|
||||||
|
|
||||||
if user.radiususer.challenge == "totp":
|
|
||||||
if not has_otp(user):
|
|
||||||
# This user is not configured for TOTP...
|
|
||||||
return False
|
|
||||||
|
|
||||||
totp = TOTPSecret.objects.get(user=user)
|
|
||||||
|
|
||||||
return totp.verify(
|
|
||||||
challenge_response
|
|
||||||
) # Returns True if the token is valid, False otherwise
|
|
||||||
|
|
||||||
if user.radiususer.challenge == "web":
|
|
||||||
ip = packet["NAS-IP-Address"][0]
|
|
||||||
auth = RadiusWebAuthentication.objects.filter(
|
|
||||||
user=user, ip=ip, expiry__gte=datetime.now()
|
|
||||||
)
|
|
||||||
auth.delete() # Delete any existing authentication sessions for this user
|
|
||||||
return True
|
|
||||||
|
|
||||||
def HandleAcctPacket(self, pkt: packet.Packet):
|
|
||||||
"""Handle an accounting packet
|
|
||||||
|
|
||||||
Args:
|
|
||||||
pkt (packet.Packet): The packet to handle
|
|
||||||
"""
|
|
||||||
status_type = pkt["Acct-Status-Type"][0]
|
|
||||||
username = pkt["User-Name"][0]
|
|
||||||
session_id = pkt["Acct-Session-Id"][0]
|
|
||||||
|
|
||||||
user = RadiusUser.objects.get(user__username=username)
|
|
||||||
|
|
||||||
session, created = RadiusAccountingSession.objects.get_or_create(
|
|
||||||
id=session_id,
|
|
||||||
defaults={
|
|
||||||
user: user,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
if status_type == "Start":
|
|
||||||
session.start()
|
|
||||||
elif status_type == "Stop":
|
|
||||||
session.stop()
|
|
||||||
|
|
||||||
RadiusAccountingEvent.objects.create(
|
|
||||||
session=session, event_type=status_type, raw_data=pkt.packet
|
|
||||||
)
|
|
||||||
|
|
||||||
acct.save()
|
|
|
@ -1,3 +0,0 @@
|
||||||
from django.test import TestCase
|
|
||||||
|
|
||||||
# Create your tests here.
|
|
|
@ -1,3 +0,0 @@
|
||||||
from django.shortcuts import render
|
|
||||||
|
|
||||||
# Create your views here.
|
|
|
@ -1,25 +1,26 @@
|
||||||
Django # Duh.
|
Django
|
||||||
|
|
||||||
git+https://kumig.it/kumitterer/django-oidc-provider/ # OIDC Provider
|
git+https://kumig.it/kumitterer/django-oidc-provider/
|
||||||
git+https://kumig.it/kumitterer/django-cas-server/ # CAS Provider
|
git+https://kumig.it/kumitterer/django-cas-server/
|
||||||
git+https://github.com/OTA-Insight/djangosaml2idp/ # SAML2 Provider
|
git+https://github.com/OTA-Insight/djangosaml2idp/
|
||||||
|
|
||||||
dbsettings # For settings in the database
|
dbsettings
|
||||||
django-autosecretkey # For generating a secret key
|
django-autosecretkey
|
||||||
|
|
||||||
cryptography # For working with key pairs
|
cryptography
|
||||||
pysaml2 # For interacting with SAML2
|
pysaml2
|
||||||
ldaptor # For interacting with LDAP
|
ldaptor
|
||||||
pyotp # For validating OTPs
|
pyotp
|
||||||
django-timezone-field # For storing timezones
|
django-timezone-field
|
||||||
django-phonenumber-field[phonenumbers] # For storing phone numbers
|
django-phonenumber-field[phonenumbers]
|
||||||
django-annoying # AutoOneToOneField
|
django-annoying
|
||||||
django-crispy-forms # For pretty forms
|
django-crispy-forms
|
||||||
crispy-bootstrap4 # Bootstrap4 theme for crispy forms
|
crispy-bootstrap4
|
||||||
pyqrcode # For generating QR codes
|
pyqrcode
|
||||||
pypng # Needed by pyqrcode for PNG generation
|
pypng
|
||||||
django-ajax-datatable # For pretty tables
|
django-ajax-datatable
|
||||||
pyjwt # Working with JWTs
|
pyjwt
|
||||||
pyrad # For everything RADIUS
|
|
||||||
django-cidrfield # For storing subnets
|
# For MySQL:
|
||||||
mysqlclient # For using MySQL as database
|
|
||||||
|
mysqlclient
|
||||||
|
|
Loading…
Reference in a new issue