290 lines
No EOL
9.6 KiB
Python
290 lines
No EOL
9.6 KiB
Python
from django.views import View
|
|
from django.http import JsonResponse
|
|
from django.forms.models import model_to_dict
|
|
from django.utils.decorators import method_decorator
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.core.validators import URLValidator
|
|
from django.core.exceptions import ValidationError
|
|
|
|
import json
|
|
import uuid
|
|
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
import urllib.request
|
|
|
|
import boto3
|
|
|
|
from core.models import Series, APIKey, Code, Media, Content
|
|
from customsettings import AWS_ACCESS_KEY_ID, AWS_BUCKET, AWS_SECRET_ACCESS_KEY
|
|
from converter import convert
|
|
from reader import read_code
|
|
|
|
class APIView(View):
|
|
@method_decorator(csrf_exempt)
|
|
def dispatch(self, request, *args, **kwargs):
|
|
try:
|
|
APIKey.objects.get(key=request.headers["apikey"])
|
|
except APIKey.DoesNotExist:
|
|
return JsonResponse({"error": "No valid API key provided"})
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
class JsonView(APIView):
|
|
@method_decorator(csrf_exempt)
|
|
def dispatch(self, request, *args, **kwargs):
|
|
try:
|
|
self.data = json.loads(request.body.decode("utf-8"))
|
|
except Exception:
|
|
return JsonResponse({"error": "No valid json found"})
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
class APICreateSeries(JsonView):
|
|
def post(self, request, *args, **kwargs):
|
|
try:
|
|
title = self.data["title"]
|
|
except KeyError:
|
|
return JsonResponse({"error": "No title for Series provided"})
|
|
|
|
description = self.data.get("description")
|
|
id = self.data.get("id")
|
|
uuid = self.data.get("uuid")
|
|
|
|
if id:
|
|
try:
|
|
Series.objects.get(id=id)
|
|
raise ValueError("Series object with id %s exists" % id)
|
|
except Series.DoesNotExist:
|
|
pass
|
|
|
|
if uuid:
|
|
try:
|
|
Series.objects.get(id=uuid)
|
|
raise ValueError("Series object with uuid %s exists" % uuid)
|
|
except Series.DoesNotExist:
|
|
pass
|
|
|
|
series = Series.objects.create(uuid=uuid, id=id, title=title, description=description)
|
|
|
|
return JsonResponse(model_to_dict(series))
|
|
|
|
class SeriesAPIView(APIView):
|
|
@method_decorator(csrf_exempt)
|
|
def dispatch(self, request, *args, **kwargs):
|
|
try:
|
|
try:
|
|
is_uuid = bool(uuid.UUID(kwargs["id"]))
|
|
except:
|
|
is_uuid = False
|
|
|
|
if is_uuid:
|
|
self.series = Series.objects.get(uuid=kwargs["id"])
|
|
else:
|
|
self.series = Series.objects.get(id=kwargs["id"])
|
|
|
|
except Series.DoesNotExist:
|
|
return JsonResponse({"error": "No Series with ID %s found" % kwargs["id"]})
|
|
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
class APISeriesByIDDispatcher(SeriesAPIView):
|
|
def get(self, request, *args, **kwargs):
|
|
return JsonResponse(model_to_dict(self.series))
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
try:
|
|
data = json.loads(request.body.decode("utf-8"))
|
|
except Exception:
|
|
return JsonResponse({"error": "No valid json found"})
|
|
|
|
title = data.get("title", self.series.title)
|
|
description = data.get("description", self.series.description)
|
|
id = data.get("id", self.series.id)
|
|
uuid = data.get("uuid", self.series.uuid)
|
|
|
|
if id != self.series.id:
|
|
return JsonResponse({"error": "Cannot change Series ID"})
|
|
|
|
if uuid != self.series.uuid:
|
|
return JsonResponse({"error": "Cannot change Series UUID"})
|
|
|
|
self.series.title = title
|
|
self.series.description = description
|
|
self.series.save()
|
|
|
|
return JsonResponse(model_to_dict(self.series))
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
self.series.delete()
|
|
return JsonResponse({})
|
|
|
|
class APICodesBySeriesIDDispatcher(SeriesAPIView):
|
|
def get(self, request, *args, **kwargs):
|
|
return JsonResponse(list(self.series.code_set.values()), safe=False)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
try:
|
|
data = json.loads(request.body.decode("utf-8"))
|
|
except Exception:
|
|
return JsonResponse({"error": "No valid json found"})
|
|
|
|
objects = []
|
|
|
|
for spec in data:
|
|
title = spec["title"]
|
|
id = spec.get("id")
|
|
uuid = spec.get("uuid")
|
|
order = 0
|
|
try:
|
|
code = Code(uuid=uuid, id=id, title=title, series=self.series, order=order)
|
|
code.save()
|
|
objects.append(model_to_dict(code))
|
|
except Exception as e:
|
|
for o in objects:
|
|
o.delete()
|
|
return JsonResponse({"error": "Failed creating Code objects: %s" % e})
|
|
|
|
return JsonResponse(objects, safe=False)
|
|
|
|
class APICreateCodes(APIView):
|
|
def post(self, request, *args, **kwargs):
|
|
try:
|
|
data = json.loads(request.body.decode("utf-8"))
|
|
except Exception:
|
|
return JsonResponse({"error": "No valid json found"})
|
|
|
|
objects = []
|
|
|
|
for spec in data:
|
|
title = spec["title"]
|
|
id = spec.get("id")
|
|
nuuid = spec.get("uuid")
|
|
series_id = spec["series"]
|
|
order = 0
|
|
|
|
try:
|
|
is_uuid = bool(uuid.UUID(series_id))
|
|
except:
|
|
is_uuid = False
|
|
|
|
if is_uuid:
|
|
series = Series.objects.get(uuid=series_id)
|
|
else:
|
|
series = Series.objects.get(id=series_id)
|
|
|
|
try:
|
|
code = Code(uuid=nuuid, id=id, title=title, series=series, order=order)
|
|
code.save()
|
|
objects.append(model_to_dict(code))
|
|
|
|
except Exception as e:
|
|
for o in objects:
|
|
o.delete()
|
|
return JsonResponse({"error": "Failed creating Code objects: %s" % e})
|
|
|
|
return JsonResponse(objects, safe=False)
|
|
|
|
class CodeAPIView(APIView):
|
|
@method_decorator(csrf_exempt)
|
|
def dispatch(self, request, *args, **kwargs):
|
|
try:
|
|
if kwargs.get("uuid"):
|
|
self.code = Code.objects.get(uuid=kwargs.get("uuid"))
|
|
else:
|
|
self.code = Code.objects.get(id=kwargs["codeid"], series=Series.objects.get(id=kwargs["seriesid"]))
|
|
|
|
except Code.DoesNotExist:
|
|
return JsonResponse({"error": "No Code for this definition found"})
|
|
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
class APICodeByID(CodeAPIView):
|
|
def get(self, request, *args, **kwargs):
|
|
return JsonResponse(model_to_dict(self.code))
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
try:
|
|
data = json.loads(request.body.decode("utf-8"))
|
|
except Exception:
|
|
return JsonResponse({"error": "No valid json found"})
|
|
|
|
title = data.get("title", self.code.title)
|
|
id = data.get("id", self.code.id)
|
|
uuid = data.get("uuid", self.code.uuid)
|
|
|
|
if id != self.code.id:
|
|
return JsonResponse({"error": "Cannot change Code ID"})
|
|
|
|
if uuid != self.code.uuid:
|
|
return JsonResponse({"error": "Cannot change Code UUID"})
|
|
|
|
self.code.title = title
|
|
self.code.save()
|
|
|
|
return JsonResponse(model_to_dict(self.code))
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
self.code.delete()
|
|
return JsonResponse({})
|
|
|
|
class APIAnalyzeContent(APIView):
|
|
def post(self, request, *args, **kwargs):
|
|
data = json.loads(request.body.decode("utf-8"))
|
|
|
|
if data.get("url"):
|
|
try:
|
|
URLValidator()(data["url"])
|
|
content = BytesIO(urllib.request.urlopen(data["url"]).read())
|
|
extension = "." + data["url"].split(".")[-1]
|
|
except:
|
|
return JsonResponse({"error": "Failed to retrieve file %s" % data["url"]})
|
|
|
|
else:
|
|
try:
|
|
session = boto3.Session(aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
|
|
s3 = session.resource("s3")
|
|
bucket = s3.Bucket(AWS_BUCKET)
|
|
content = BytesIO()
|
|
bucket.download_fileobj(data["path"], content)
|
|
extension = "." + data["path"].split(".")[-1]
|
|
except:
|
|
return JsonResponse({"error": "Failed to retrieve file %s" % data["path"]})
|
|
|
|
content.seek(0)
|
|
|
|
down = convert(content, extension)
|
|
|
|
literal = read_code(down)
|
|
|
|
if literal:
|
|
code = None
|
|
|
|
if len(literal.split(":")) == 3:
|
|
seriesid = literal.split(":")[1]
|
|
codeid = literal.split(":")[2]
|
|
try:
|
|
code = Code.objects.get(id=codeid, series=Series.objects.get(id=seriesid))
|
|
parsed = code.title
|
|
except (Code.DoesNotExist, Series.DoesNotExist):
|
|
return JsonResponse({"error": "No code found for Series %s Code %s" % (seriesid, codeid)})
|
|
|
|
else:
|
|
parsed = ":".join(literal.split(":")[1:])
|
|
|
|
content = Content.objects.create(literal=literal)
|
|
media = Media.objects.create(content=content)
|
|
|
|
output = {
|
|
"uuid": media.uuid,
|
|
"literal": media.content.literal
|
|
}
|
|
|
|
if code:
|
|
output["parsed"] = parsed
|
|
output["code"] = model_to_dict(code)
|
|
output["code"]["series"] = model_to_dict(code.series)
|
|
|
|
else:
|
|
output = {"error": "No code found in provided image"}
|
|
|
|
return JsonResponse(output) |