#!/usr/bin/env python3 """ This script downloads issues from a given Gitea repository and produces a pull request for each issue. It assumes that the default branch (default "main") exists and that you have a valid API token if authentication is required. """ import logging from pathlib import Path import argparse import requests import sys import dataclasses import tempfile import subprocess import os import re from . import secrets from .seen_issues_db import SeenIssuesDB def generate_branch_name(issue_number: str, issue_title: str) -> str: """ Create a branch name by sanitizing the issue title. Non-alphanumeric characters (except spaces) are removed, the text is lowercased, and spaces are replaced with dashes. """ sanitized = re.sub(r"[^0-9a-zA-Z ]+", "", issue_title) parts = ['issue', str(issue_number), *sanitized.lower().split()] return "-".join(parts) logger = logging.getLogger(__name__) def bash_cmd(*commands: str) -> str: return "bash -c \""+';'.join(commands)+"\"" AIDER_TEST=bash_cmd( "virtualenv venv", "source venv/bin/activate", "pip install -e .", "pytest test", ) AIDER_LINT=bash_cmd( "ruff format", "ruff check --fix --ignore RUF022 --ignore PGH004", "ruff format", "ruff check --ignore RUF022 --ignore PGH004", ) MODEL = 'o3-mini' LLM_MESSAGE_FORMAT = """ {issue} # Solution Details For code tasks: 1. Create a plan for how to solve the issue. 2. Write unit tests that proves that your solution works. 3. Then, solve the issue by writing the required code. """ def create_aider_command(issue: str) -> list[str]: return [ 'aider', '--model', MODEL, '--chat-language', 'english', '--test-cmd', AIDER_TEST, '--lint-cmd', AIDER_LINT, '--auto-test', '--no-auto-lint', '--api-key', secrets.llm_api_key(), '--read', 'CONVENTIONS.md', '--message', LLM_MESSAGE_FORMAT.format(issue=issue), '--yes-always', '--architect', ] class GiteaClient: def __init__(self, gitea_url: str, token: str) -> None: assert not gitea_url.endswith('/api/v1') self.gitea_url = gitea_url + '/api/v1' self.session = requests.Session() self.session.headers["Content-Type"] = "application/json" if token: self.session.headers["Authorization"] = f"token {token}" def get_default_branch_sha(self, owner, repo, branch): """Retrieve the commit SHA of the default branch.""" url = f"{self.gitea_url}/repos/{owner}/{repo}/branches/{branch}" response = self.session.get(url) response.raise_for_status() data = response.json() return data['commit']['sha'] def create_branch(self, owner, repo, new_branch, sha): """Create a new branch from the provided SHA.""" url = f"{self.gitea_url}/repos/{owner}/{repo}/git/refs" json_data = {"ref": f"refs/heads/{new_branch}", "sha": sha} response = self.session.post(url, json=json_data) if response.status_code == 422: logger.warning(f"Branch {new_branch} already exists.") return False response.raise_for_status() return True def get_issues(self, owner, repo): """Download issues from the specified repository and filter those with the aider label.""" url = f"{self.gitea_url}/repos/{owner}/{repo}/issues" response = self.session.get(url) response.raise_for_status() issues = response.json() # Filter to only include issues marked with the "aider" label. issues = [ issue for issue in issues if any(label.get("name") == "aider" for label in issue.get("labels", [])) ] return issues def parse_args(): parser = argparse.ArgumentParser(description="Download issues and create pull requests for a Gitea repository.") parser.add_argument("--gitea-url", required=True, help="Base URL for the Gitea instance, e.g., https://gitfub.space/api/v1") parser.add_argument("--owner", required=True, help="Owner of the repository") parser.add_argument("--repo", required=True, help="Repository name") parser.add_argument("--base-branch", default="main", help="Base branch to use for new branches (default: main)") parser.add_argument("--daemon", action="store_true", help="Run in daemon mode, continuously checking for new issues.") return parser.parse_args() def push_changes(cwd: Path, branch_name: str, issue_number: str, issue_title: str, issue_description: str, base_branch: str) -> None: cmd = [ "git", "push", "origin", f"HEAD:refs/for/{base_branch}", "-o", f"topic={branch_name}", "-o", f"title={issue_title}", "-o", f"description=This pull request resolves #{issue_number}", ] run_cmd(cmd, cwd) def run_cmd(cmd: list[str], cwd:Path|None=None) -> None: print(cmd) subprocess.run(cmd, check=True, cwd=cwd) def process_issue(args, tmpdirname: Path, branch_name: str, issue_title: str, issue_description: str, issue_number: str): repo_url = f"{args.gitea_url}:{args.owner}/{args.repo}.git".replace('https://', 'git@') run_cmd(["git", "clone", repo_url, tmpdirname]) run_cmd(['bash', '-c', AIDER_TEST], tmpdirname) run_cmd(["git", "checkout", args.base_branch], tmpdirname) run_cmd(["git", "checkout", "-b", branch_name], tmpdirname) run_cmd(create_aider_command(f'# {issue_title}\n{issue_description}'), tmpdirname) run_cmd(["git", "add", "."], tmpdirname) push_changes(tmpdirname, branch_name, issue_number, issue_title, issue_description, args.base_branch) def main(): logging.basicConfig(level='INFO') args = parse_args() seen_issues_db = SeenIssuesDB() client = GiteaClient(args.gitea_url, secrets.gitea_token()) try: issues = client.get_issues(args.owner, args.repo) except Exception: logger.exception('Failed to retrieve issues') sys.exit(1) if not issues: logger.info("No issues found.") return while True: for issue in issues: issue_number = issue.get("number") issue_description = issue.get("body", "") title = issue.get("title", f"Issue {issue_number}") issue_text = f"{title}\n{issue_description}" if seen_issues_db.has_seen(issue_text): logger.info(f"Skipping already processed issue #{issue_number}: {title}") continue branch_name = generate_branch_name(issue_number, title) seen_issues_db.mark_as_seen(issue_text) try: with tempfile.TemporaryDirectory() as tmpdirname: process_issue(args, Path(tmpdirname), branch_name, title, issue_description, issue_number) except Exception: logger.exception('Error processing issue') sys.exit(1) if not args.daemon: break logger.info("Daemon mode: sleeping for 60 seconds before checking for new issues.") time.sleep(60) if __name__ == "__main__": main()