Current status
This commit is contained in:
commit
a439cddeae
1 changed files with 163 additions and 0 deletions
163
tempjwk.py
Executable file
163
tempjwk.py
Executable file
|
@ -0,0 +1,163 @@
|
|||
#!/usr/bin/env python3
|
||||
import subprocess
|
||||
import time
|
||||
import select
|
||||
import pathlib
|
||||
import sys
|
||||
import tempfile
|
||||
import argparse
|
||||
|
||||
|
||||
def check_provisioner(provisioner_name: str) -> bool:
|
||||
provisioner_exists_command = ["step-cli", "ca", "provisioner", "list"]
|
||||
provisioner_exists_process = subprocess.run(
|
||||
provisioner_exists_command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True)
|
||||
|
||||
if provisioner_name in provisioner_exists_process.stdout:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def delete_provisioner(provisioner_name: str, admin_provisioner: str, admin_subject: str):
|
||||
delete_command = ["step-cli", "ca", "provisioner", "remove",
|
||||
provisioner_name,
|
||||
"--admin-provisioner", admin_provisioner,
|
||||
"--admin-subject", admin_subject]
|
||||
|
||||
try:
|
||||
delete_process = subprocess.run(delete_command, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise ValueError(f"Error: Failed to delete provisioner: {e}")
|
||||
|
||||
|
||||
def add_provisioner(provisioner_name: str, admin_provisioner: str, admin_subject: str, private_key_path: str, public_key_path: str):
|
||||
# Decrypt the private key using gpg and write it to a temporary file
|
||||
with tempfile.NamedTemporaryFile() as temp_file:
|
||||
gpg_command = ["gpg", "--decrypt", str(private_key_path)]
|
||||
gpg_process = subprocess.run(gpg_command, stdout=temp_file)
|
||||
|
||||
if gpg_process.returncode != 0:
|
||||
raise ValueError("Failed to decrypt private key")
|
||||
|
||||
try:
|
||||
# Add the JWK provisioner using the step CLI
|
||||
step_command = [
|
||||
"step-cli", "ca", "provisioner", "add",
|
||||
"--type", "jwk",
|
||||
"--public-key", str(public_key_path),
|
||||
"--private-key", temp_file.name,
|
||||
provisioner_name,
|
||||
"--admin-provisioner", admin_provisioner,
|
||||
"--admin-subject", admin_subject,
|
||||
]
|
||||
step_process = subprocess.run(step_command, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise ValueError(f"Error: Failed to add provisioner: {e}")
|
||||
|
||||
|
||||
def decrypt_file(input_file: pathlib.Path, output_file: pathlib.Path = None) -> pathlib.Path:
|
||||
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
||||
gpg_command = ["gpg", "--decrypt", str(input_file)]
|
||||
|
||||
try:
|
||||
gpg_process = subprocess.run(
|
||||
gpg_command, stdout=temp_file, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
pathlib.Path(temp_file.name).unlink()
|
||||
raise ValueError(f"Error: Failed decrypting file: {e}")
|
||||
|
||||
return pathlib.Path(temp_file.name)
|
||||
|
||||
|
||||
def main():
|
||||
# Get the script's directory
|
||||
script_dir = pathlib.Path(__file__).parent
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Add and delete a JWK provisioner using the Step CLI.")
|
||||
parser.add_argument("--admin-provisioner", dest="admin_provisioner", default="KumiDC",
|
||||
help="The name of the admin provisioner to use. Default is 'KumiDC'.")
|
||||
parser.add_argument("--admin-subject", dest="admin_subject", default="admin",
|
||||
help="The email address of the admin subject to use. Default is 'admin'.")
|
||||
parser.add_argument("--provisioner-name", dest="provisioner_name", default="tempjwk",
|
||||
help="The name of the JWK provisioner to add/delete. Default is 'tempjwk'.")
|
||||
parser.add_argument("--public-key", dest="public_key_path", default="key.pub",
|
||||
help="The path to the public key file for the provisioner. Default is 'key.pub'.")
|
||||
parser.add_argument("--private-key", dest="private_key_path", default="key.priv.gpg",
|
||||
help="The path to the GPG encrypted private key file for the provisioner. Default is 'key.priv.gpg'.")
|
||||
parser.add_argument("--timeout", dest="timeout", type=int, default=300,
|
||||
help="The number of seconds to wait for the user to delete the provisioner. Default is 300 seconds.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Construct the full default paths to the key files
|
||||
public_key_path = script_dir / args.public_key_path
|
||||
private_key_path = script_dir / args.private_key_path
|
||||
|
||||
# Check that the public key file exists
|
||||
if not public_key_path.exists():
|
||||
print("Error: public key file does not exist")
|
||||
exit(1)
|
||||
|
||||
try:
|
||||
provisioner_exists = check_provisioner(args.provisioner_name)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error: Failed to check if provisioner exists: {e}")
|
||||
exit(1)
|
||||
|
||||
if provisioner_exists:
|
||||
print(
|
||||
f"Provisioner {args.provisioner_name} already exists. Deleting existing provisioner...")
|
||||
try:
|
||||
delete_provisioner(args.provisioner_name,
|
||||
args.admin_provisioner,
|
||||
args.admin_subject)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(
|
||||
f"Error: Failed to delete existing provisioner {args.provisioner_name}: {e}")
|
||||
exit(1)
|
||||
|
||||
print(f"Existing provisioner {args.provisioner_name} deleted.")
|
||||
|
||||
try:
|
||||
add_provisioner(args.provisioner_name,
|
||||
args.admin_provisioner,
|
||||
args.admin_subject,
|
||||
args.private_key_path,
|
||||
args.public_key_path)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error: Failed to add provisioner {args.provisioner_name}: {e}")
|
||||
exit(1)
|
||||
|
||||
print(f"Provisioner {args.provisioner_name} added.")
|
||||
print(f"Press any key to delete it (up to {args.timeout} seconds)")
|
||||
|
||||
# Wait for the user to press any key (up to TIMEOUT seconds)
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if time.time() - start_time > args.timeout:
|
||||
break
|
||||
if select.select([sys.stdin,], [], [], 0.5)[0]:
|
||||
break
|
||||
|
||||
# Delete the provisioner
|
||||
try:
|
||||
delete_provisioner(args.provisioner_name,
|
||||
args.admin_provisioner,
|
||||
args.admin_subject)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(
|
||||
f"Error: Failed to delete provisioner {args.provisioner_name}: {e}")
|
||||
exit(1)
|
||||
|
||||
print(f"Provisioner {args.provisioner_name} deleted.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in a new issue