unrip/scripts/deploy/forgejo_repo_bootstrap.py

138 lines
5.7 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import base64
import json
import ssl
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
class ForgejoClient:
def __init__(self, base_url: str, username: str | None = None, password: str | None = None, token: str | None = None):
self.base_url = base_url.rstrip('/')
self.username = username or ''
self.headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if token:
self.headers['Authorization'] = f'token {token}'
elif username is not None and password is not None:
credentials = base64.b64encode(f'{username}:{password}'.encode()).decode()
self.headers['Authorization'] = f'Basic {credentials}'
else:
raise ValueError('ForgejoClient requires either token auth or username/password auth')
self.ssl_context = ssl.create_default_context()
def request(self, method: str, path: str, payload=None, expected=(200, 201, 204)):
url = f'{self.base_url}{path}'
data = None
if payload is not None:
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, method=method)
for key, value in self.headers.items():
req.add_header(key, value)
try:
with urllib.request.urlopen(req, context=self.ssl_context) as response:
body = response.read().decode() if response.length != 0 else ''
if response.status not in expected:
raise RuntimeError(f'{method} {path} returned {response.status}: {body}')
return json.loads(body) if body else None
except urllib.error.HTTPError as exc:
body = exc.read().decode()
if exc.code not in expected:
raise RuntimeError(f'{method} {path} returned {exc.code}: {body}') from exc
return json.loads(body) if body else None
def get_repo(self, owner: str, repo: str):
try:
return self.request('GET', f'/api/v1/repos/{owner}/{repo}')
except RuntimeError as exc:
if ' returned 404:' in str(exc):
return None
raise
def create_repo(self, owner: str, name: str, private: bool):
payload = {
'name': name,
'private': private,
'auto_init': False,
'default_branch': 'main',
}
if owner == self.username:
return self.request('POST', '/api/v1/user/repos', payload, expected=(201,))
return self.request('POST', f'/api/v1/orgs/{urllib.parse.quote(owner)}/repos', payload, expected=(201,))
def upsert_variable(self, owner: str, repo: str, name: str, value: str):
try:
self.request(
'POST',
f'/api/v1/repos/{owner}/{repo}/actions/variables/{urllib.parse.quote(name)}',
{'value': value},
expected=(201, 204),
)
except RuntimeError as exc:
if ' returned 409:' not in str(exc) and ' returned 422:' not in str(exc):
raise
self.request(
'PUT',
f'/api/v1/repos/{owner}/{repo}/actions/variables/{urllib.parse.quote(name)}',
{'value': value},
expected=(201, 204),
)
def upsert_secret(self, owner: str, repo: str, name: str, value: str):
self.request(
'PUT',
f'/api/v1/repos/{owner}/{repo}/actions/secrets/{urllib.parse.quote(name)}',
{'data': value},
expected=(201, 204),
)
def main():
parser = argparse.ArgumentParser(description='Bootstrap Forgejo repo secrets/variables for app deployment')
parser.add_argument('--forgejo-url', required=True)
parser.add_argument('--admin-username')
parser.add_argument('--admin-password')
parser.add_argument('--token')
parser.add_argument('--repo-owner', required=True)
parser.add_argument('--repo-name', required=True)
parser.add_argument('--repo-private', action='store_true', default=False)
parser.add_argument('--ci-kubeconfig', required=True)
parser.add_argument('--registry-host', required=True)
parser.add_argument('--project-name', required=True)
parser.add_argument('--project-namespace', required=True)
parser.add_argument('--project-deployments', required=True)
parser.add_argument('--project-registry-secret-name', required=True)
args = parser.parse_args()
client = ForgejoClient(args.forgejo_url, args.admin_username, args.admin_password, args.token)
repo = client.get_repo(args.repo_owner, args.repo_name)
if repo is None:
created = client.create_repo(args.repo_owner, args.repo_name, args.repo_private)
print(f'created repo {created["full_name"]}')
else:
print(f'repo already exists: {repo["full_name"]}')
kubeconfig_b64 = base64.b64encode(Path(args.ci_kubeconfig).read_bytes()).decode()
client.upsert_secret(args.repo_owner, args.repo_name, 'KUBECONFIG_B64', kubeconfig_b64)
print('upserted repo action secret KUBECONFIG_B64')
client.upsert_variable(args.repo_owner, args.repo_name, 'REGISTRY_HOST', args.registry_host)
client.upsert_variable(args.repo_owner, args.repo_name, 'PROJECT_NAME', args.project_name)
client.upsert_variable(args.repo_owner, args.repo_name, 'PROJECT_NAMESPACE', args.project_namespace)
client.upsert_variable(args.repo_owner, args.repo_name, 'PROJECT_DEPLOYMENTS', args.project_deployments)
client.upsert_variable(
args.repo_owner,
args.repo_name,
'PROJECT_REGISTRY_SECRET_NAME',
args.project_registry_secret_name,
)
print('upserted repo action variables')
if __name__ == '__main__':
main()