Kumi
5a64f31a1c
Updated the script execution commands to use `sys.executable` instead of a hard-coded "python" to ensure compatibility with different Python environments. Additionally, refactored code for better readability and consistency in formatting, particularly around argument parsing and error handling in both manage_hetzner_servers.py and update_local_config.py. Improves deployment flexibility and code maintainability.
109 lines
3.4 KiB
Python
109 lines
3.4 KiB
Python
import subprocess
|
|
import re
|
|
import tempfile
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import pathlib
|
|
|
|
SCRIPT_PATH = pathlib.Path(__file__).parent / "worker.py"
|
|
|
|
|
|
def execute_script(provider, location, server_type):
|
|
command = [
|
|
os.path.realpath(sys.executable),
|
|
SCRIPT_PATH,
|
|
"--provider",
|
|
provider,
|
|
"--location",
|
|
location,
|
|
"--server_type",
|
|
server_type,
|
|
]
|
|
result = subprocess.run(command, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise Exception(f"Error executing script: {result.stderr}")
|
|
return result.stdout.strip()
|
|
|
|
|
|
def replace_peer_section(config_content, new_peer_section):
|
|
# Regular expression to match the [Peer] section
|
|
peer_section_pattern = re.compile(r"\[Peer\](?:\n(?!\[)[^\n]*)*")
|
|
|
|
# Find the existing [Peer] section
|
|
match = peer_section_pattern.search(config_content)
|
|
|
|
if match:
|
|
# Replace the existing [Peer] section with the new one
|
|
updated_content = (
|
|
config_content[: match.start()]
|
|
+ new_peer_section
|
|
+ config_content[match.end() :]
|
|
)
|
|
else:
|
|
# If no [Peer] section exists, append the new one
|
|
updated_content = config_content + "\n" + new_peer_section
|
|
|
|
return updated_content
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Update local Wireguard configuration with a new peer."
|
|
)
|
|
parser.add_argument(
|
|
"--provider",
|
|
type=str,
|
|
choices=["hetzner", "aws", "digitalocean", "azure"],
|
|
required=True,
|
|
help="Cloud provider",
|
|
)
|
|
parser.add_argument("--location", type=str, required=True, help="Server location")
|
|
parser.add_argument("--server_type", type=str, required=True, help="Server type")
|
|
parser.add_argument(
|
|
"--interface", type=str, required=True, help="Wireguard interface (e.g., wg0)"
|
|
)
|
|
parser.add_argument(
|
|
"--config_path", type=str, help="Path to the Wireguard configuration file"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
provider = args.provider
|
|
location = args.location
|
|
server_type = args.server_type
|
|
interface = args.interface
|
|
config_path = args.config_path or f"/etc/wireguard/{interface}.conf"
|
|
|
|
# Step 1: Execute the existing script to set up the remote VPN server
|
|
new_peer_section = execute_script(provider, location, server_type)
|
|
|
|
# Step 2: Read the local Wireguard configuration file
|
|
with open(config_path, "r") as file:
|
|
config_content = file.read()
|
|
|
|
# Step 3: Replace the existing [Peer] section with the new one
|
|
updated_content = replace_peer_section(config_content, new_peer_section)
|
|
|
|
# Step 4: Save the updated configuration file
|
|
with tempfile.TemporaryDirectory() as tempdir:
|
|
temp_file = pathlib.Path(tempdir) / f"{interface}.conf"
|
|
temp_file_path = str(temp_file)
|
|
|
|
with open(temp_file_path, "w") as file:
|
|
file.write(updated_content)
|
|
|
|
# Step 5: Apply the updated Wireguard configuration
|
|
subprocess.run(["wg-quick", "down", interface], stderr=subprocess.DEVNULL)
|
|
subprocess.run(["wg-quick", "up", temp_file_path])
|
|
|
|
# Overwrite the original config file with the updated content
|
|
os.replace(temp_file_path, config_path)
|
|
|
|
print(
|
|
f"Local Wireguard configuration for {interface} updated and applied successfully."
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|