from django.views import View from django.http import HttpRequest, JsonResponse from django.utils import timezone from datastore.models import APIToken, APIUser from typing import List, Optional import json class BaseAPIView(View): http_method_names: List[str] = ["post"] unwrap_request: bool = True wrap_response: bool = True public: bool = False view_name: Optional[str] data: Optional[dict] token: Optional[APIToken] def get_view_name(self) -> str: return self.view_name or self.request.get_raw_uri().split("/")[-1] or self.__class__.__name__[:-4] def wrapper(self, indict: dict, force: bool = False): if self.wrap_response or force: return { f"{self.get_view_name()}Result": indict } def authentication_error(self, message: str) -> JsonResponse: response_data: dict = { "Authentication_Approved": False, "Authentication_ReasonDenied": message } return JsonResponse(wrapper(response_data)) def error(self, message: str) -> dict: response_data: dict = { "ErrorText": message } return response_data def handle_request(self, request: HttpRequest, *args, **kwargs) -> dict: raise NotImplementedError( f"Class {self.__class__.__name__} does not implement handle_request()!") def post(self, request: HttpRequest, *args, **kwargs) -> JsonResponse: self.data: dict = json.loads(request.body) if self.unwrap_request: self.data = self.data["request"] if not self.public: if not self.data["Authentication_Token"]: return self.authentication_error("Authentication token required for non-public API endpoint") if not (tokens := APIToken.objects.filter(value=self.data["Authentication_Token"], expiry__lte=timezone.now())).exists(): return self.authentication_error("Authentication token does not exist or has expired") self.token = tokens.first() response_data = self.handle_request(request, *args, **kwargs) response_data["Authentication_Approved"] = True return JsonResponse(wrapper(response_data)) class BaseAuthenticationView(BaseAPIView): unwrap_request = False wrap_response = False public = True def handle_request(self, request, *args, **kwargs) -> dict: if (not "credentials" in self.data) or (not "Login" in self.data["credentials"]) or (not "Password" in self.data["credentials"]): return self.error("You need to pass credentials including Login and Password") if (not (users := APIUser.objects.filter(username=self.data["credentials"]["Login"])).exists()) or not (user := users.first()).check_password(self.data["credentials"]["Password"]): return self.error("Username or password incorrect") validity = timezone.timedelta(seconds=int( self.data["credentials"].get("LifeTime", 0))) token = APIToken.objects.create( user=user, expiry=timezone.now() + validity) return { "Authentication_Token": token.value }