pyadonis/classes/adonis.py
2022-06-22 11:20:34 +02:00

114 lines
3.5 KiB
Python

from urllib.request import urlopen, Request, urljoin
from base64 import b64decode
import json
from classes.exceptions import AdonisAuthenticationDeniedError, AdonisError
from classes.crew import Crew
from const import USER_AGENT
from PIL.Image import Image
CREW_PORTAL = 0
INTEGRATION = 1
class Adonis:
def __init__(self, crew_portal_base_url: str, integration_base_url: str, login: str, password: str):
self.crew_portal_base_url = crew_portal_base_url
self.integration_base_url = integration_base_url
self.login = login
self.password = password
@classmethod
def fromConfig(cls, config):
return cls(config.crew_portal_base_url, config.integration_base_url, config.login, config.password)
def request(self, endpoint: str, payload: dict, add_token: bool = True, wrap_request: bool = True, extract_response: bool = True, api: int = CREW_PORTAL):
base_url = self.crew_portal_base_url if api == CREW_PORTAL else self.integration_base_url
url = urljoin(base_url, endpoint)
req = Request(url)
req.add_header("User-Agent", USER_AGENT)
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
if add_token:
payload["Authentication_Token"] = self.getToken(api=api)
if wrap_request:
payload = {"request": payload}
req.data = json.dumps(payload).encode()
res = json.loads(urlopen(req).read())
if extract_response:
res = res[endpoint + "Result"]
if not res["Authentication_Approved"]:
raise AdonisAuthenticationDeniedError(
res["Authentication_ReasonDenied"])
if res["ErrorText"]:
raise AdonisError(res["ErrorText"])
return res
def requestIntegration(self, endpoint: str, payload: dict, add_token: bool = True, wrap_request: bool = True, extract_response: bool = True):
return self.request(endpoint, payload, add_token, wrap_request, extract_response, INTEGRATION)
def requestCrewPortal(self, endpoint: str, payload: dict, add_token: bool = True, wrap_request: bool = True, extract_response: bool = True):
return self.request(endpoint, payload, add_token, wrap_request, extract_response, CREW_PORTAL)
def getToken(self, lifetime: int = 60, api: int = CREW_PORTAL):
data = {
"credentials": {
"Login": self.login,
"Password": self.password,
"LifeTime": lifetime
}
}
method = self.requestCrewPortal if api == CREW_PORTAL else self.requestIntegration
result = method("GNL_API_AUTHENTICATION", data, False, False)
return result["Authentication_Token"]
def getCrewPersonalData(self, pin):
data = {
"Pin": pin
}
result = self.requestCrewPortal("DG_PersonalInformationRead", data)
return result
def getCrewProfilePicture(self, pin):
data = {
"Pin": pin
}
result = self.requestCrewPortal("DG_ProfilePictureRead", data)
try:
return result["FileBase64"]
except AdonisError:
return None
def getCrew(self, pin, personal_data=True, profile_picture=True):
crew = Crew(pin)
if personal_data:
crew._personal_data = self.getCrewPersonalData(pin)
if profile_picture:
crew._profile_picture = self.getCrewProfilePicture(pin)
return crew
def getRequirements():
pass