303 lines
8.8 KiB
Python
303 lines
8.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
This script downloads issues from a given Gitea repository and produces a pull request for each issue.
|
|
It assumes that the default branch (default "main") exists and that you have a valid API token if authentication is required.
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
from . import secrets
|
|
from .seen_issues_db import SeenIssuesDB
|
|
|
|
|
|
def generate_branch_name(issue_number: str, issue_title: str) -> str:
|
|
"""
|
|
Create a branch name by sanitizing the issue title.
|
|
Non-alphanumeric characters (except spaces) are removed,
|
|
the text is lowercased, and spaces are replaced with dashes.
|
|
"""
|
|
sanitized = re.sub(r'[^0-9a-zA-Z ]+', '', issue_title)
|
|
parts = ['issue', str(issue_number), *sanitized.lower().split()]
|
|
return '-'.join(parts)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def bash_cmd(*commands: str) -> str:
|
|
return 'bash -c "' + ';'.join(commands) + '"'
|
|
|
|
|
|
AIDER_TEST = bash_cmd(
|
|
'virtualenv venv',
|
|
'source venv/bin/activate',
|
|
'pip install -e .',
|
|
'pytest test',
|
|
)
|
|
|
|
AIDER_LINT = bash_cmd(
|
|
'ruff format',
|
|
'ruff check --fix --ignore RUF022 --ignore PGH004',
|
|
'ruff format',
|
|
'ruff check --ignore RUF022 --ignore PGH004',
|
|
)
|
|
|
|
|
|
LLM_MESSAGE_FORMAT = """
|
|
{issue}
|
|
|
|
# Solution Details
|
|
|
|
For code tasks:
|
|
|
|
1. Create a plan for how to solve the issue.
|
|
2. Write unit tests that proves that your solution works.
|
|
3. Then, solve the issue by writing the required code.
|
|
"""
|
|
|
|
|
|
def create_aider_command(issue: str) -> list[str]:
|
|
return [
|
|
'aider',
|
|
'--chat-language',
|
|
'english',
|
|
'--test-cmd',
|
|
AIDER_TEST,
|
|
'--lint-cmd',
|
|
AIDER_LINT,
|
|
'--auto-test',
|
|
'--no-auto-lint',
|
|
'--api-key',
|
|
secrets.llm_api_key(),
|
|
'--read',
|
|
'CONVENTIONS.md',
|
|
'--message',
|
|
LLM_MESSAGE_FORMAT.format(issue=issue),
|
|
'--yes-always',
|
|
'--architect',
|
|
]
|
|
|
|
|
|
class GiteaClient:
|
|
def __init__(self, gitea_url: str, token: str) -> None:
|
|
assert not gitea_url.endswith('/api/v1')
|
|
self.gitea_url = gitea_url + '/api/v1'
|
|
self.session = requests.Session()
|
|
self.session.headers['Content-Type'] = 'application/json'
|
|
if token:
|
|
self.session.headers['Authorization'] = f'token {token}'
|
|
|
|
def get_default_branch_sha(self, owner, repo, branch):
|
|
"""Retrieve the commit SHA of the default branch."""
|
|
url = f'{self.gitea_url}/repos/{owner}/{repo}/branches/{branch}'
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data['commit']['sha']
|
|
|
|
def create_branch(self, owner, repo, new_branch, sha):
|
|
"""Create a new branch from the provided SHA."""
|
|
url = f'{self.gitea_url}/repos/{owner}/{repo}/git/refs'
|
|
json_data = {'ref': f'refs/heads/{new_branch}', 'sha': sha}
|
|
response = self.session.post(url, json=json_data)
|
|
if response.status_code == 422:
|
|
logger.warning(f'Branch {new_branch} already exists.')
|
|
return False
|
|
response.raise_for_status()
|
|
return True
|
|
|
|
def get_issues(self, owner, repo):
|
|
"""Download issues from the specified repository and filter those with the aider label."""
|
|
url = f'{self.gitea_url}/repos/{owner}/{repo}/issues'
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
issues = response.json()
|
|
# Filter to only include issues marked with the "aider" label.
|
|
issues = [
|
|
issue
|
|
for issue in issues
|
|
if any(label.get('name') == 'aider' for label in issue.get('labels', []))
|
|
]
|
|
return issues
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description='Download issues and create pull requests for a Gitea repository.',
|
|
)
|
|
parser.add_argument(
|
|
'--gitea-url',
|
|
required=True,
|
|
help='Base URL for the Gitea instance, e.g., https://gitfub.space/api/v1',
|
|
)
|
|
parser.add_argument('--owner', required=True, help='Owner of the repository')
|
|
parser.add_argument('--repo', required=True, help='Repository name')
|
|
parser.add_argument(
|
|
'--base-branch',
|
|
default='main',
|
|
help='Base branch to use for new branches (default: main)',
|
|
)
|
|
parser.add_argument(
|
|
'--daemon',
|
|
action='store_true',
|
|
help='Run in daemon mode to continuously monitor for new issues',
|
|
)
|
|
parser.add_argument(
|
|
'--interval',
|
|
type=int,
|
|
default=300,
|
|
help='Interval in seconds between checks in daemon mode (default: 300)',
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def push_changes(
|
|
cwd: Path,
|
|
branch_name: str,
|
|
issue_number: str,
|
|
issue_title: str,
|
|
issue_description: str,
|
|
base_branch: str,
|
|
) -> bool:
|
|
# Check if there are any commits on the branch before pushing
|
|
if not has_commits_on_branch(cwd, base_branch, branch_name):
|
|
logger.info(f'No commits made on branch {branch_name}, skipping push')
|
|
return False
|
|
cmd = [
|
|
'git',
|
|
'push',
|
|
'origin',
|
|
f'HEAD:refs/for/{base_branch}',
|
|
'-o',
|
|
f'topic={branch_name}',
|
|
'-o',
|
|
f'title={issue_title}',
|
|
'-o',
|
|
f'description=This pull request resolves #{issue_number}',
|
|
]
|
|
run_cmd(cmd, cwd)
|
|
return True
|
|
|
|
|
|
def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool:
|
|
"""Check if there are any commits on the current branch that aren't in the base branch."""
|
|
try:
|
|
result = subprocess.run(
|
|
['git', 'log', f'{base_branch}..{current_branch}', '--oneline'],
|
|
check=True,
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return bool(result.stdout.strip())
|
|
except subprocess.CalledProcessError:
|
|
logger.exception(f'Failed to check commits on branch {current_branch}')
|
|
return False
|
|
|
|
|
|
def run_cmd(cmd: list[str], cwd: Path | None = None) -> None:
|
|
print(cmd)
|
|
subprocess.run(cmd, check=True, cwd=cwd)
|
|
|
|
|
|
def solve_issue_in_repository(
|
|
args,
|
|
tmpdirname: Path,
|
|
branch_name: str,
|
|
issue_title: str,
|
|
issue_description: str,
|
|
issue_number: str,
|
|
) -> bool:
|
|
repo_url = f'{args.gitea_url}:{args.owner}/{args.repo}.git'.replace(
|
|
'https://',
|
|
'git@',
|
|
)
|
|
run_cmd(['git', 'clone', repo_url, tmpdirname])
|
|
run_cmd(['bash', '-c', AIDER_TEST], tmpdirname)
|
|
run_cmd(['git', 'checkout', args.base_branch], tmpdirname)
|
|
run_cmd(['git', 'checkout', '-b', branch_name], tmpdirname)
|
|
run_cmd(create_aider_command(f'# {issue_title}\n{issue_description}'), tmpdirname)
|
|
run_cmd(['git', 'add', '.'], tmpdirname)
|
|
|
|
return push_changes(
|
|
tmpdirname,
|
|
branch_name,
|
|
issue_number,
|
|
issue_title,
|
|
issue_description,
|
|
args.base_branch,
|
|
)
|
|
|
|
|
|
def handle_issues(args, client, seen_issues_db):
|
|
"""Process all open issues with the 'aider' label."""
|
|
try:
|
|
issues = client.get_issues(args.owner, args.repo)
|
|
except Exception:
|
|
logger.exception('Failed to retrieve issues')
|
|
sys.exit(1)
|
|
|
|
if not issues:
|
|
logger.info('No issues found.')
|
|
return
|
|
|
|
for issue in issues:
|
|
issue_number = issue.get('number')
|
|
issue_description = issue.get('body', '')
|
|
title = issue.get('title', f'Issue {issue_number}')
|
|
issue_text = f'{title}\n{issue_description}'
|
|
if seen_issues_db.has_seen(issue_text):
|
|
logger.info(f'Skipping already processed issue #{issue_number}: {title}')
|
|
continue
|
|
|
|
branch_name = generate_branch_name(issue_number, title)
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
solved = solve_issue_in_repository(
|
|
args,
|
|
Path(tmpdirname),
|
|
branch_name,
|
|
title,
|
|
issue_description,
|
|
issue_number,
|
|
)
|
|
|
|
if solved:
|
|
seen_issues_db.mark_as_seen(issue_text)
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(level='INFO')
|
|
args = parse_args()
|
|
|
|
seen_issues_db = SeenIssuesDB()
|
|
client = GiteaClient(args.gitea_url, secrets.gitea_token())
|
|
|
|
if args.daemon:
|
|
logger.info(
|
|
f'Starting daemon mode, checking for new issues every {args.interval} seconds',
|
|
)
|
|
try:
|
|
while True:
|
|
logger.info('Checking for new issues...')
|
|
handle_issues(args, client, seen_issues_db)
|
|
logger.info(f'Sleeping for {args.interval} seconds...')
|
|
time.sleep(args.interval)
|
|
except KeyboardInterrupt:
|
|
logger.exception('Daemon stopped by user')
|
|
else:
|
|
# One-off run
|
|
handle_issues(args, client, seen_issues_db)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|