164 lines
6.3 KiB
Python
164 lines
6.3 KiB
Python
|
#!/usr/bin/env python3
|
||
|
import subprocess
|
||
|
import time
|
||
|
import select
|
||
|
import pathlib
|
||
|
import sys
|
||
|
import tempfile
|
||
|
import argparse
|
||
|
|
||
|
|
||
|
def check_provisioner(provisioner_name: str) -> bool:
|
||
|
provisioner_exists_command = ["step-cli", "ca", "provisioner", "list"]
|
||
|
provisioner_exists_process = subprocess.run(
|
||
|
provisioner_exists_command,
|
||
|
capture_output=True,
|
||
|
text=True,
|
||
|
check=True)
|
||
|
|
||
|
if provisioner_name in provisioner_exists_process.stdout:
|
||
|
return True
|
||
|
|
||
|
return False
|
||
|
|
||
|
|
||
|
def delete_provisioner(provisioner_name: str, admin_provisioner: str, admin_subject: str):
|
||
|
delete_command = ["step-cli", "ca", "provisioner", "remove",
|
||
|
provisioner_name,
|
||
|
"--admin-provisioner", admin_provisioner,
|
||
|
"--admin-subject", admin_subject]
|
||
|
|
||
|
try:
|
||
|
delete_process = subprocess.run(delete_command, check=True)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
raise ValueError(f"Error: Failed to delete provisioner: {e}")
|
||
|
|
||
|
|
||
|
def add_provisioner(provisioner_name: str, admin_provisioner: str, admin_subject: str, private_key_path: str, public_key_path: str):
|
||
|
# Decrypt the private key using gpg and write it to a temporary file
|
||
|
with tempfile.NamedTemporaryFile() as temp_file:
|
||
|
gpg_command = ["gpg", "--decrypt", str(private_key_path)]
|
||
|
gpg_process = subprocess.run(gpg_command, stdout=temp_file)
|
||
|
|
||
|
if gpg_process.returncode != 0:
|
||
|
raise ValueError("Failed to decrypt private key")
|
||
|
|
||
|
try:
|
||
|
# Add the JWK provisioner using the step CLI
|
||
|
step_command = [
|
||
|
"step-cli", "ca", "provisioner", "add",
|
||
|
"--type", "jwk",
|
||
|
"--public-key", str(public_key_path),
|
||
|
"--private-key", temp_file.name,
|
||
|
provisioner_name,
|
||
|
"--admin-provisioner", admin_provisioner,
|
||
|
"--admin-subject", admin_subject,
|
||
|
]
|
||
|
step_process = subprocess.run(step_command, check=True)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
raise ValueError(f"Error: Failed to add provisioner: {e}")
|
||
|
|
||
|
|
||
|
def decrypt_file(input_file: pathlib.Path, output_file: pathlib.Path = None) -> pathlib.Path:
|
||
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
||
|
gpg_command = ["gpg", "--decrypt", str(input_file)]
|
||
|
|
||
|
try:
|
||
|
gpg_process = subprocess.run(
|
||
|
gpg_command, stdout=temp_file, check=True)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
pathlib.Path(temp_file.name).unlink()
|
||
|
raise ValueError(f"Error: Failed decrypting file: {e}")
|
||
|
|
||
|
return pathlib.Path(temp_file.name)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
# Get the script's directory
|
||
|
script_dir = pathlib.Path(__file__).parent
|
||
|
|
||
|
parser = argparse.ArgumentParser(
|
||
|
description="Add and delete a JWK provisioner using the Step CLI.")
|
||
|
parser.add_argument("--admin-provisioner", dest="admin_provisioner", default="KumiDC",
|
||
|
help="The name of the admin provisioner to use. Default is 'KumiDC'.")
|
||
|
parser.add_argument("--admin-subject", dest="admin_subject", default="admin",
|
||
|
help="The email address of the admin subject to use. Default is 'admin'.")
|
||
|
parser.add_argument("--provisioner-name", dest="provisioner_name", default="tempjwk",
|
||
|
help="The name of the JWK provisioner to add/delete. Default is 'tempjwk'.")
|
||
|
parser.add_argument("--public-key", dest="public_key_path", default="key.pub",
|
||
|
help="The path to the public key file for the provisioner. Default is 'key.pub'.")
|
||
|
parser.add_argument("--private-key", dest="private_key_path", default="key.priv.gpg",
|
||
|
help="The path to the GPG encrypted private key file for the provisioner. Default is 'key.priv.gpg'.")
|
||
|
parser.add_argument("--timeout", dest="timeout", type=int, default=300,
|
||
|
help="The number of seconds to wait for the user to delete the provisioner. Default is 300 seconds.")
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
# Construct the full default paths to the key files
|
||
|
public_key_path = script_dir / args.public_key_path
|
||
|
private_key_path = script_dir / args.private_key_path
|
||
|
|
||
|
# Check that the public key file exists
|
||
|
if not public_key_path.exists():
|
||
|
print("Error: public key file does not exist")
|
||
|
exit(1)
|
||
|
|
||
|
try:
|
||
|
provisioner_exists = check_provisioner(args.provisioner_name)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
print(f"Error: Failed to check if provisioner exists: {e}")
|
||
|
exit(1)
|
||
|
|
||
|
if provisioner_exists:
|
||
|
print(
|
||
|
f"Provisioner {args.provisioner_name} already exists. Deleting existing provisioner...")
|
||
|
try:
|
||
|
delete_provisioner(args.provisioner_name,
|
||
|
args.admin_provisioner,
|
||
|
args.admin_subject)
|
||
|
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
print(
|
||
|
f"Error: Failed to delete existing provisioner {args.provisioner_name}: {e}")
|
||
|
exit(1)
|
||
|
|
||
|
print(f"Existing provisioner {args.provisioner_name} deleted.")
|
||
|
|
||
|
try:
|
||
|
add_provisioner(args.provisioner_name,
|
||
|
args.admin_provisioner,
|
||
|
args.admin_subject,
|
||
|
args.private_key_path,
|
||
|
args.public_key_path)
|
||
|
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
print(f"Error: Failed to add provisioner {args.provisioner_name}: {e}")
|
||
|
exit(1)
|
||
|
|
||
|
print(f"Provisioner {args.provisioner_name} added.")
|
||
|
print(f"Press any key to delete it (up to {args.timeout} seconds)")
|
||
|
|
||
|
# Wait for the user to press any key (up to TIMEOUT seconds)
|
||
|
start_time = time.time()
|
||
|
while True:
|
||
|
if time.time() - start_time > args.timeout:
|
||
|
break
|
||
|
if select.select([sys.stdin,], [], [], 0.5)[0]:
|
||
|
break
|
||
|
|
||
|
# Delete the provisioner
|
||
|
try:
|
||
|
delete_provisioner(args.provisioner_name,
|
||
|
args.admin_provisioner,
|
||
|
args.admin_subject)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
print(
|
||
|
f"Error: Failed to delete provisioner {args.provisioner_name}: {e}")
|
||
|
exit(1)
|
||
|
|
||
|
print(f"Provisioner {args.provisioner_name} deleted.")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|