Changes
This commit is contained in:
parent
c9ed677f0a
commit
4b58d1a474
9 changed files with 134 additions and 56 deletions
6
.gitignore
vendored
6
.gitignore
vendored
|
@ -2,5 +2,11 @@ __pycache__/
|
|||
*.pyc
|
||||
*.swp
|
||||
settings.ini
|
||||
completionmail.ini
|
||||
venv/
|
||||
output/
|
||||
reports/*.py
|
||||
!reports/__init__.py
|
||||
.vscode
|
||||
*.old
|
||||
*.bak
|
||||
|
|
0
__init__.py
Normal file
0
__init__.py
Normal file
|
@ -1,52 +0,0 @@
|
|||
from classes.config import MonsterConfig
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
|
||||
|
||||
config = MonsterConfig("settings.ini")
|
||||
|
||||
certs = []
|
||||
|
||||
before = datetime.utcnow().replace(hour=0,minute=0,second=0,microsecond=0)
|
||||
after = before - timedelta(days=1)
|
||||
|
||||
for vessel in config.vessels:
|
||||
users = dict()
|
||||
|
||||
ocourses = vessel.getCourses()
|
||||
courses = dict()
|
||||
|
||||
for ocourse in ocourses:
|
||||
courses[ocourse["id"]] = ocourse
|
||||
|
||||
for ocert in vessel.getCerts(after=after.timestamp(), before=before.timestamp()):
|
||||
if ocert["cert"]:
|
||||
cert = dict()
|
||||
|
||||
user_id = ocert["userid"]
|
||||
if not (user := users.get(user_id)):
|
||||
user = vessel.getUsers(id=user_id)[user_id]
|
||||
users[user_id] = user
|
||||
|
||||
cert["user_name"] = f'{user["firstname"]} {user["lastname"]}'
|
||||
cert["user_email"] = user["email"]
|
||||
cert["user_pin"] = user["custom_fields"].get("pin")
|
||||
cert["course_id"] = ocert["cert"]["course"]
|
||||
cert["course_shortname"] = courses[ocert["cert"]["course"]]["shortname"]
|
||||
cert["course_fullname"] = courses[ocert["cert"]["course"]]["fullname"]
|
||||
cert["code"] = ocert["code"]
|
||||
cert["time_created"] = datetime.utcfromtimestamp(ocert["timecreated"]).strftime('%Y-%m-%d %H:%M:%S')
|
||||
cert["vessel"] = vessel.name
|
||||
certs.append(cert)
|
||||
|
||||
certs = sorted(certs, key=lambda d: d["time_created"])
|
||||
|
||||
keys = ["user_name", "user_email", "user_pin", "course_id", "course_shortname", "course_fullname", "code", "time_created", "vessel"]
|
||||
|
||||
with open('test.csv', 'a') as output_file:
|
||||
dict_writer = csv.DictWriter(output_file, restval="", fieldnames=keys, delimiter=';')
|
||||
dict_writer.writeheader()
|
||||
dict_writer.writerows(certs)
|
|
@ -29,10 +29,16 @@ class MonsterConfig:
|
|||
if section.startswith("Vessel"):
|
||||
self.vessels.append(Vessel.fromConfig(parser[section]))
|
||||
|
||||
try:
|
||||
self.pyadonis = Path(parser["MONSTER"]["PyAdonis"])
|
||||
except KeyError:
|
||||
print(f"PyAdonis is not defined in the MONSTER section of {path}, some features may be missing.")
|
||||
|
||||
def __init__(self, path: Union[str, Path]) -> None:
|
||||
"""Initialize a new (empty) MonsterConfig object
|
||||
"""
|
||||
self.vessels = []
|
||||
self.pyadonis = None
|
||||
|
||||
if path:
|
||||
self.readFile(path)
|
||||
|
|
|
@ -5,6 +5,7 @@ from typing import Optional, Union
|
|||
from datetime import datetime
|
||||
|
||||
from MySQLdb.cursors import DictCursor
|
||||
from bcrypt import hashpw, gensalt
|
||||
|
||||
from const import *
|
||||
|
||||
|
@ -77,10 +78,18 @@ class Vessel:
|
|||
def connect(self):
|
||||
return Database(self)
|
||||
|
||||
@staticmethod
|
||||
def getTimestamp() -> int:
|
||||
return int(datetime.now().timestamp())
|
||||
|
||||
def getCourses(self) -> list:
|
||||
results = self.db._execute(QUERY_COURSE, ctype=DictCursor)
|
||||
return results
|
||||
|
||||
def getCourseContext(self, courseid: int) -> Optional[dict]:
|
||||
results = self.db._execute(QUERY_COURSE_CONTEXT, (courseid,), ctype=DictCursor)
|
||||
return results
|
||||
|
||||
def getUserInfoFields(self) -> list:
|
||||
results = self.db._execute(QUERY_USER_INFO_FIELD, ctype=DictCursor)
|
||||
return results
|
||||
|
@ -133,11 +142,15 @@ class Vessel:
|
|||
odata = self.getUserInfoData(ofield["id"], id)
|
||||
|
||||
for value in odata:
|
||||
try:
|
||||
users[value["userid"]]["custom_fields"][ofield["shortname"]] = value["data"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return users
|
||||
|
||||
def getHTMLCerts(self, after: int = 0, before: int = int(datetime.now().timestamp())):
|
||||
def getHTMLCerts(self, after: int = 0, before: Optional[int] = None):
|
||||
before = before or Vessel.getTimestamp()
|
||||
results = self.db._execute(f"{QUERY_HTML_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}", ctype=DictCursor)
|
||||
ocerts = self.db._execute(QUERY_HTML_CERT, ctype=DictCursor)
|
||||
|
||||
|
@ -156,7 +169,8 @@ class Vessel:
|
|||
|
||||
return results
|
||||
|
||||
def getCustomCerts(self, after: int = 0, before: int = int(datetime.now().timestamp())):
|
||||
def getCustomCerts(self, after: int = 0, before: Optional[int] = None):
|
||||
before = before or Vessel.getTimestamp()
|
||||
results = self.db._execute(f"{QUERY_CUSTOM_CERT_ISSUES} {QUERY_WHERE_TIMESTAMPS % {'column': 'timecreated', 'after': after, 'before': before}}", ctype=DictCursor)
|
||||
ocerts = self.db._execute(QUERY_CUSTOM_CERT, ctype=DictCursor)
|
||||
|
||||
|
@ -175,6 +189,78 @@ class Vessel:
|
|||
|
||||
return results
|
||||
|
||||
def getCerts(self, after: int = 0, before: int = int(datetime.now().timestamp())):
|
||||
def getCerts(self, after: int = 0, before: Optional[int] = None):
|
||||
before = before or Vessel.getTimestamp()
|
||||
return sorted(self.getHTMLCerts(after, before) + self.getCustomCerts(after, before), key=lambda d: d["timecreated"])
|
||||
|
||||
def setPassword(self, username: str, password: str):
|
||||
hashed = hashpw(password.encode(), gensalt(prefix=b"2b"))
|
||||
query = QUERY_USER_SET_PASSWORD
|
||||
self.db._execute(query, (password, username))
|
||||
|
||||
def getEnrols(self, enrol: Optional[str] = None):
|
||||
results = list(self.db._execute(QUERY_ENROL, ctype=DictCursor))
|
||||
|
||||
if enrol:
|
||||
return list(filter(lambda x: x["enrol"] == enrol, results))
|
||||
|
||||
return results
|
||||
|
||||
def createEnrol(self, courseid: int, enrol: str = "manual"):
|
||||
self.db._execute(QUERY_ENROL_INSERT, (enrol, courseid))
|
||||
|
||||
def createEnrolment(self, userid: int, courseid: int, enrol: str = "manual"):
|
||||
enrol = list(filter(lambda x: x["courseid"] == courseid, self.getEnrols(enrol)))
|
||||
|
||||
if not enrol:
|
||||
self.createEnrol(courseid, enrol)
|
||||
enrol = list(filter(lambda x: x["courseid"] == courseid , self.getEnrols(enrol)))
|
||||
|
||||
assert enrol
|
||||
|
||||
self.db._execute(QUERY_ENROL_USER, (enrol[0]["id"], userid, Vessel.getTimestamp(), Vessel.getTimestamp()))
|
||||
|
||||
def getEnrolments(self):
|
||||
results = list(self.db._execute(QUERY_ENROLMENTS, ctype=DictCursor))
|
||||
return results
|
||||
|
||||
def createUser(self, username, password, email, firstname, lastname):
|
||||
email = email or f"{username}@pin.seachefsacademy.com"
|
||||
self.db._execute(QUERY_USER_CREATE, (username, email, firstname, lastname, Vessel.getTimestamp(), Vessel.getTimestamp()))
|
||||
self.setPassword(username, password)
|
||||
|
||||
def assignRole(self, userid: int, courseid: int, roleid: int = 5):
|
||||
contextid = self.getCourseContext(courseid)[0]["id"]
|
||||
self.db._execute(QUERY_ASSIGN_ROLE, (roleid, contextid, userid, Vessel.getTimestamp()))
|
||||
|
||||
def getRole(self, userid: int, courseid: int) -> Optional[int]:
|
||||
contextid = self.getCourseContext(courseid)[0]["id"]
|
||||
results = self.db._execute(QUERY_GET_ROLE, (contextid, userid), ctype=DictCursor)
|
||||
if results:
|
||||
return results[0]["roleid"]
|
||||
|
||||
def getUserIdByName(self, username: str) -> Optional[int]:
|
||||
results = self.db._execute(QUERY_GET_USERID, (username,), ctype=DictCursor)
|
||||
if results:
|
||||
return results[0]["id"]
|
||||
|
||||
def setEmail(self, userid: int, email: str):
|
||||
email = email or f"{self.getUsers(id=userid)[userid]['username']}@pin.seachefsacademy.com"
|
||||
self.db._execute(QUERY_USER_SET_EMAIL, (email, userid))
|
||||
|
||||
def getCustomCourseFields(self):
|
||||
results = list(self.db._execute(QUERY_COURSE_FIELDS, ctype=DictCursor))
|
||||
return results
|
||||
|
||||
def getCourseByContext(self, contextid: int) -> Optional[int]:
|
||||
results = self.db._execute(QUERY_COURSE_CONTEXT_REVERSE, (contextid,), ctype=DictCursor)
|
||||
if results:
|
||||
return results[0]["instanceid"]
|
||||
|
||||
def getCourseModules(self, courseid: int):
|
||||
results = list(self.db._execute(QUERY_COURSE_MODULES, (courseid,), ctype=DictCursor))
|
||||
return results
|
||||
|
||||
def getCourseModuleCompletion(self, moduleid: int):
|
||||
results = list(self.db._execute(QUERY_MODULE_COMPLETION, (moduleid,), ctype=DictCursor))
|
||||
return results
|
||||
|
|
15
const.py
15
const.py
|
@ -7,7 +7,22 @@ QUERY_CUSTOM_CERT = "SELECT * FROM mdl_customcert"
|
|||
QUERY_USER = "SELECT * FROM mdl_user"
|
||||
QUERY_USER_INFO_FIELD = "SELECT * FROM mdl_user_info_field"
|
||||
QUERY_USER_INFO_DATA = "SELECT * FROM mdl_user_info_data"
|
||||
QUERY_USER_SET_PASSWORD = "UPDATE mdl_user SET password = %s WHERE username = %s"
|
||||
QUERY_USER_SET_EMAIL = "UPDATE mdl_user SET email = %s WHERE id = %s"
|
||||
QUERY_USER_CREATE = "INSERT INTO mdl_user(username, email, firstname, lastname, timecreated, timemodified, mnethostid, confirmed) VALUES (%s, %s, %s, %s, %s, %s, 1, 1)"
|
||||
QUERY_ASSIGN_ROLE = "INSERT INTO mdl_role_assignments(roleid, contextid, userid, timemodified) VALUES (%s, %s, %s, %s)"
|
||||
QUERY_GET_ROLE = "SELECT * FROM mdl_role_assignments WHERE contextid = %s AND userid = %s"
|
||||
QUERY_GET_USERID = "SELECT * FROM mdl_user WHERE username = %s"
|
||||
|
||||
QUERY_COURSE = "SELECT * FROM mdl_course"
|
||||
QUERY_ENROL = "SELECT * FROM mdl_enrol"
|
||||
QUERY_ENROL_INSERT = "INSERT INTO mdl_enrol(enrol, courseid) VALUES (%s, %s)"
|
||||
QUERY_ENROL_USER = "INSERT INTO mdl_user_enrolments(enrolid, userid, timecreated, timemodified) VALUES (%s, %s, %s, %s)"
|
||||
QUERY_ENROLMENTS = "SELECT u.username AS username, c.shortname AS courseid, ue.timecreated AS timecreated FROM mdl_user_enrolments ue JOIN mdl_user u ON u.id = ue.userid JOIN mdl_enrol e ON ue.enrolid = e.id JOIN mdl_course c ON e.courseid = c.id"
|
||||
QUERY_COURSE_CONTEXT = "SELECT * FROM mdl_context WHERE contextlevel = 50 AND instanceid = %s"
|
||||
QUERY_COURSE_CONTEXT_REVERSE = "SELECT * FROM mdl_context WHERE id = %s"
|
||||
QUERY_COURSE_FIELDS = "SELECT * FROM mdl_customfield_category cat JOIN mdl_customfield_field fld ON fld.categoryid = cat.id JOIN mdl_customfield_data dat ON dat.fieldid = fld.id"
|
||||
QUERY_COURSE_MODULES = "SELECT * FROM mdl_course_modules WHERE course = %s"
|
||||
QUERY_MODULE_COMPLETION = "SELECT * FROM mdl_course_modules_completion WHERE coursemoduleid = %s"
|
||||
|
||||
QUERY_WHERE_TIMESTAMPS = "WHERE %(column)s >= %(after)i AND %(column)s < %(before)i"
|
||||
|
|
0
reports/__init__.py
Normal file
0
reports/__init__.py
Normal file
16
runreport.py
Normal file
16
runreport.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
import argparse
|
||||
import importlib
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(description='Run a specified report')
|
||||
|
||||
parser.add_argument('report') #, type=str, help='name of the report to execute')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
sys.argv = sys.argv[1:]
|
||||
|
||||
try:
|
||||
importlib.import_module("reports.%s" % args.report)
|
||||
except ImportError as e:
|
||||
print("Error: Could not import %s: %s" % (args.report, str(e)))
|
|
@ -1,4 +1,5 @@
|
|||
[MONSTER]
|
||||
PyAdonis = /opt/pyadonis/
|
||||
|
||||
[Vessel vessel1]
|
||||
Host = 10.12.13.14
|
||||
|
|
Loading…
Reference in a new issue