Compare commits

...

5 Commits

Author SHA1 Message Date
08b07b5dda Run ruff after aider
All checks were successful
Run Python tests (through Pytest) / Test (push) Successful in 24s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 22s
2025-04-13 18:14:35 +02:00
59e3efaf3c Code quality 2025-04-13 18:11:00 +02:00
8a77769500 Ruff 2025-04-13 18:06:56 +02:00
bdee056b67 Ruff 2025-04-13 18:06:44 +02:00
8c75a10b3a Autolint 2025-04-13 18:06:31 +02:00
7 changed files with 208 additions and 120 deletions

View File

@ -6,4 +6,4 @@ Use [Aider](https://aider.chat/) by creating issues. The program will then
automatically invoke Aider and create a pull request for the issue. automatically invoke Aider and create a pull request for the issue.
""" """
from ._version import __version__ # noqa: F401 from ._version import __version__ # noqa: F401

View File

@ -4,53 +4,60 @@ This script downloads issues from a given Gitea repository and produces a pull r
It assumes that the default branch (default "main") exists and that you have a valid API token if authentication is required. It assumes that the default branch (default "main") exists and that you have a valid API token if authentication is required.
""" """
import logging
from pathlib import Path
import argparse import argparse
import requests import logging
import sys
import dataclasses
import tempfile
import subprocess
import os
import time
import re import re
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import requests
from . import secrets from . import secrets
from .seen_issues_db import SeenIssuesDB from .seen_issues_db import SeenIssuesDB
def generate_branch_name(issue_number: str, issue_title: str) -> str: def generate_branch_name(issue_number: str, issue_title: str) -> str:
""" """
Create a branch name by sanitizing the issue title. Create a branch name by sanitizing the issue title.
Non-alphanumeric characters (except spaces) are removed, Non-alphanumeric characters (except spaces) are removed,
the text is lowercased, and spaces are replaced with dashes. the text is lowercased, and spaces are replaced with dashes.
""" """
sanitized = re.sub(r"[^0-9a-zA-Z ]+", "", issue_title) sanitized = re.sub(r'[^0-9a-zA-Z ]+', '', issue_title)
parts = ['issue', str(issue_number), *sanitized.lower().split()] parts = ['issue', str(issue_number), *sanitized.lower().split()]
return "-".join(parts) return '-'.join(parts)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def bash_cmd(*commands: str) -> str: def bash_cmd(*commands: str) -> str:
return "bash -c \""+';'.join(commands)+"\"" commands = ('set -e', *commands)
return 'bash -c "' + ';'.join(commands) + '"'
AIDER_TEST=bash_cmd(
"virtualenv venv", AIDER_TEST = bash_cmd(
"source venv/bin/activate", 'virtualenv venv',
"pip install -e .", 'source venv/bin/activate',
"pytest test", 'pip install -e .',
'pytest test',
) )
AIDER_LINT=bash_cmd( RUFF_FORMAT_AND_AUTO_FIX = bash_cmd(
"ruff format", 'ruff format',
"ruff check --fix --ignore RUF022 --ignore PGH004", 'ruff check --fix --ignore RUF022 --ignore PGH004',
"ruff format", 'ruff format',
"ruff check --ignore RUF022 --ignore PGH004", 'ruff check --fix --ignore RUF022 --ignore PGH004',
) )
AIDER_LINT = bash_cmd(
RUFF_FORMAT_AND_AUTO_FIX,
'ruff format',
'ruff check --ignore RUF022 --ignore PGH004',
)
#MODEL = 'o3-mini'
#MODEL = 'claude-3-7-sonnet'
LLM_MESSAGE_FORMAT = """ LLM_MESSAGE_FORMAT = """
{issue} {issue}
@ -64,35 +71,41 @@ For code tasks:
3. Then, solve the issue by writing the required code. 3. Then, solve the issue by writing the required code.
""" """
def create_aider_command(issue: str) -> list[str]: def create_aider_command(issue: str) -> list[str]:
return [ return [
'aider', 'aider',
#'--model', MODEL, '--chat-language',
'--chat-language', 'english', 'english',
'--test-cmd', AIDER_TEST, '--test-cmd',
'--lint-cmd', AIDER_LINT, AIDER_TEST,
'--auto-test', '--lint-cmd',
'--no-auto-lint', AIDER_LINT,
'--api-key', secrets.llm_api_key(), '--auto-test',
'--read', 'CONVENTIONS.md', '--no-auto-lint',
'--message', LLM_MESSAGE_FORMAT.format(issue=issue), '--api-key',
'--yes-always', secrets.llm_api_key(),
'--architect', '--read',
'CONVENTIONS.md',
'--message',
LLM_MESSAGE_FORMAT.format(issue=issue),
'--yes-always',
'--architect',
] ]
class GiteaClient:
class GiteaClient:
def __init__(self, gitea_url: str, token: str) -> None: def __init__(self, gitea_url: str, token: str) -> None:
assert not gitea_url.endswith('/api/v1') assert not gitea_url.endswith('/api/v1')
self.gitea_url = gitea_url + '/api/v1' self.gitea_url = gitea_url + '/api/v1'
self.session = requests.Session() self.session = requests.Session()
self.session.headers["Content-Type"] = "application/json" self.session.headers['Content-Type'] = 'application/json'
if token: if token:
self.session.headers["Authorization"] = f"token {token}" self.session.headers['Authorization'] = f'token {token}'
def get_default_branch_sha(self, owner, repo, branch): def get_default_branch_sha(self, owner, repo, branch):
"""Retrieve the commit SHA of the default branch.""" """Retrieve the commit SHA of the default branch."""
url = f"{self.gitea_url}/repos/{owner}/{repo}/branches/{branch}" url = f'{self.gitea_url}/repos/{owner}/{repo}/branches/{branch}'
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
@ -100,87 +113,146 @@ class GiteaClient:
def create_branch(self, owner, repo, new_branch, sha): def create_branch(self, owner, repo, new_branch, sha):
"""Create a new branch from the provided SHA.""" """Create a new branch from the provided SHA."""
url = f"{self.gitea_url}/repos/{owner}/{repo}/git/refs" url = f'{self.gitea_url}/repos/{owner}/{repo}/git/refs'
json_data = {"ref": f"refs/heads/{new_branch}", "sha": sha} json_data = {'ref': f'refs/heads/{new_branch}', 'sha': sha}
response = self.session.post(url, json=json_data) response = self.session.post(url, json=json_data)
if response.status_code == 422: if response.status_code == 422:
logger.warning(f"Branch {new_branch} already exists.") logger.warning(f'Branch {new_branch} already exists.')
return False return False
response.raise_for_status() response.raise_for_status()
return True return True
def get_issues(self, owner, repo): def get_issues(self, owner, repo):
"""Download issues from the specified repository and filter those with the aider label.""" """Download issues from the specified repository and filter those with the aider label."""
url = f"{self.gitea_url}/repos/{owner}/{repo}/issues" url = f'{self.gitea_url}/repos/{owner}/{repo}/issues'
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
issues = response.json() issues = response.json()
# Filter to only include issues marked with the "aider" label. # Filter to only include issues marked with the "aider" label.
issues = [ issues = [
issue for issue in issues issue
if any(label.get("name") == "aider" for label in issue.get("labels", [])) for issue in issues
if any(label.get('name') == 'aider' for label in issue.get('labels', []))
] ]
return issues return issues
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="Download issues and create pull requests for a Gitea repository.") parser = argparse.ArgumentParser(
parser.add_argument("--gitea-url", required=True, help="Base URL for the Gitea instance, e.g., https://gitfub.space/api/v1") description='Download issues and create pull requests for a Gitea repository.',
parser.add_argument("--owner", required=True, help="Owner of the repository") )
parser.add_argument("--repo", required=True, help="Repository name") parser.add_argument(
parser.add_argument("--base-branch", default="main", help="Base branch to use for new branches (default: main)") '--gitea-url',
parser.add_argument("--daemon", action="store_true", help="Run in daemon mode to continuously monitor for new issues") required=True,
parser.add_argument("--interval", type=int, default=300, help="Interval in seconds between checks in daemon mode (default: 300)") help='Base URL for the Gitea instance, e.g., https://gitfub.space/api/v1',
)
parser.add_argument('--owner', required=True, help='Owner of the repository')
parser.add_argument('--repo', required=True, help='Repository name')
parser.add_argument(
'--base-branch',
default='main',
help='Base branch to use for new branches (default: main)',
)
parser.add_argument(
'--daemon',
action='store_true',
help='Run in daemon mode to continuously monitor for new issues',
)
parser.add_argument(
'--interval',
type=int,
default=300,
help='Interval in seconds between checks in daemon mode (default: 300)',
)
return parser.parse_args() return parser.parse_args()
def push_changes(cwd: Path, branch_name: str, issue_number: str, issue_title: str, issue_description: str, base_branch: str) -> None:
def push_changes(
cwd: Path,
branch_name: str,
issue_number: str,
issue_title: str,
issue_description: str,
base_branch: str,
) -> bool:
# Check if there are any commits on the branch before pushing
if not has_commits_on_branch(cwd, base_branch, branch_name):
logger.info(f'No commits made on branch {branch_name}, skipping push')
return False
cmd = [ cmd = [
"git", 'git',
"push", 'push',
"origin", 'origin',
f"HEAD:refs/for/{base_branch}", f'HEAD:refs/for/{base_branch}',
"-o", '-o',
f"topic={branch_name}", f'topic={branch_name}',
"-o", '-o',
f"title={issue_title}", f'title={issue_title}',
"-o", '-o',
f"description=This pull request resolves #{issue_number}", f'description=This pull request resolves #{issue_number}',
] ]
run_cmd(cmd, cwd) run_cmd(cmd, cwd)
return True
def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool: def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool:
"""Check if there are any commits on the current branch that aren't in the base branch.""" """Check if there are any commits on the current branch that aren't in the base branch."""
try: try:
result = subprocess.run( result = subprocess.run(
["git", "log", f"{base_branch}..{current_branch}", "--oneline"], ['git', 'log', f'{base_branch}..{current_branch}', '--oneline'],
check=True, check=True,
cwd=cwd, cwd=cwd,
capture_output=True, capture_output=True,
text=True text=True,
) )
return bool(result.stdout.strip()) return bool(result.stdout.strip())
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
logger.exception(f"Failed to check commits on branch {current_branch}") logger.exception(f'Failed to check commits on branch {current_branch}')
return False return False
def run_cmd(cmd: list[str], cwd:Path|None=None) -> None:
def run_cmd(cmd: list[str], cwd: Path | None = None) -> None:
print(cmd) print(cmd)
subprocess.run(cmd, check=True, cwd=cwd) subprocess.run(cmd, check=True, cwd=cwd)
def process_issue(args, tmpdirname: Path, branch_name: str, issue_title: str, issue_description: str, issue_number: str): def solve_issue_in_repository(
repo_url = f"{args.gitea_url}:{args.owner}/{args.repo}.git".replace('https://', 'git@') args,
run_cmd(["git", "clone", repo_url, tmpdirname]) tmpdirname: Path,
branch_name: str,
issue_title: str,
issue_description: str,
issue_number: str,
) -> bool:
repo_url = f'{args.gitea_url}:{args.owner}/{args.repo}.git'.replace(
'https://',
'git@',
)
# Setup repository
run_cmd(['git', 'clone', repo_url, tmpdirname])
run_cmd(['bash', '-c', AIDER_TEST], tmpdirname) run_cmd(['bash', '-c', AIDER_TEST], tmpdirname)
run_cmd(["git", "checkout", args.base_branch], tmpdirname) run_cmd(['git', 'checkout', args.base_branch], tmpdirname)
run_cmd(["git", "checkout", "-b", branch_name], tmpdirname) run_cmd(['git', 'checkout', '-b', branch_name], tmpdirname)
# Run aider
run_cmd(create_aider_command(f'# {issue_title}\n{issue_description}'), tmpdirname) run_cmd(create_aider_command(f'# {issue_title}\n{issue_description}'), tmpdirname)
run_cmd(["git", "add", "."], tmpdirname)
# Auto-fix standard code quality stuff
# Check if there are any commits on the branch before pushing run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], tmpdirname)
if has_commits_on_branch(tmpdirname, args.base_branch, branch_name): run_cmd(['git', 'add', '.'], tmpdirname)
push_changes(tmpdirname, branch_name, issue_number, issue_title, issue_description, args.base_branch) run_cmd(['git', 'commit', '-m', 'Ruff'], tmpdirname)
else:
logger.info(f"No commits made on branch {branch_name}, skipping push") # Push changes
return push_changes(
tmpdirname,
branch_name,
issue_number,
issue_title,
issue_description,
args.base_branch,
)
def handle_issues(args, client, seen_issues_db): def handle_issues(args, client, seen_issues_db):
"""Process all open issues with the 'aider' label.""" """Process all open issues with the 'aider' label."""
@ -191,27 +263,32 @@ def handle_issues(args, client, seen_issues_db):
sys.exit(1) sys.exit(1)
if not issues: if not issues:
logger.info("No issues found.") logger.info('No issues found.')
return return
for issue in issues: for issue in issues:
issue_number = issue.get("number") issue_number = issue.get('number')
issue_description = issue.get("body", "") issue_description = issue.get('body', '')
title = issue.get("title", f"Issue {issue_number}") title = issue.get('title', f'Issue {issue_number}')
issue_text = f"{title}\n{issue_description}" issue_text = f'{title}\n{issue_description}'
if seen_issues_db.has_seen(issue_text): if seen_issues_db.has_seen(issue_text):
logger.info(f"Skipping already processed issue #{issue_number}: {title}") logger.info(f'Skipping already processed issue #{issue_number}: {title}')
continue continue
branch_name = generate_branch_name(issue_number, title) branch_name = generate_branch_name(issue_number, title)
try: with tempfile.TemporaryDirectory() as tmpdirname:
with tempfile.TemporaryDirectory() as tmpdirname: solved = solve_issue_in_repository(
process_issue(args, Path(tmpdirname), branch_name, title, issue_description, issue_number) args,
except Exception: Path(tmpdirname),
logger.exception('Error processing issue') branch_name,
sys.exit(1) title,
issue_description,
issue_number,
)
if solved:
seen_issues_db.mark_as_seen(issue_text)
seen_issues_db.mark_as_seen(issue_text)
def main(): def main():
logging.basicConfig(level='INFO') logging.basicConfig(level='INFO')
@ -221,18 +298,21 @@ def main():
client = GiteaClient(args.gitea_url, secrets.gitea_token()) client = GiteaClient(args.gitea_url, secrets.gitea_token())
if args.daemon: if args.daemon:
logger.info(f"Starting daemon mode, checking for new issues every {args.interval} seconds") logger.info(
f'Starting daemon mode, checking for new issues every {args.interval} seconds',
)
try: try:
while True: while True:
logger.info("Checking for new issues...") logger.info('Checking for new issues...')
handle_issues(args, client, seen_issues_db) handle_issues(args, client, seen_issues_db)
logger.info(f"Sleeping for {args.interval} seconds...") logger.info(f'Sleeping for {args.interval} seconds...')
time.sleep(args.interval) time.sleep(args.interval)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("Daemon stopped by user") logger.exception('Daemon stopped by user')
else: else:
# One-off run # One-off run
handle_issues(args, client, seen_issues_db) handle_issues(args, client, seen_issues_db)
if __name__ == "__main__":
if __name__ == '__main__':
main() main()

View File

@ -2,8 +2,10 @@ import secret_loader
SECRETS = secret_loader.SecretLoader() SECRETS = secret_loader.SecretLoader()
def llm_api_key(): def llm_api_key():
return SECRETS.load_or_fail('LLM_API_KEY') return SECRETS.load_or_fail('LLM_API_KEY')
def gitea_token(): def gitea_token():
return SECRETS.load_or_fail('GITEA_TOKEN') return SECRETS.load_or_fail('GITEA_TOKEN')

View File

@ -3,6 +3,7 @@ from hashlib import sha256
DEFAULT_DB_PATH = 'output/seen_issues.db' DEFAULT_DB_PATH = 'output/seen_issues.db'
class SeenIssuesDB: class SeenIssuesDB:
def __init__(self, db_path=DEFAULT_DB_PATH): def __init__(self, db_path=DEFAULT_DB_PATH):
self.conn = sqlite3.connect(db_path) self.conn = sqlite3.connect(db_path)
@ -10,20 +11,26 @@ class SeenIssuesDB:
def _create_table(self): def _create_table(self):
with self.conn: with self.conn:
self.conn.execute(''' self.conn.execute("""
CREATE TABLE IF NOT EXISTS seen_issues ( CREATE TABLE IF NOT EXISTS seen_issues (
issue_hash TEXT PRIMARY KEY issue_hash TEXT PRIMARY KEY
) )
''') """)
def mark_as_seen(self, issue_text: str): def mark_as_seen(self, issue_text: str):
issue_hash = self._compute_hash(issue_text) issue_hash = self._compute_hash(issue_text)
with self.conn: with self.conn:
self.conn.execute('INSERT OR IGNORE INTO seen_issues (issue_hash) VALUES (?)', (issue_hash,)) self.conn.execute(
'INSERT OR IGNORE INTO seen_issues (issue_hash) VALUES (?)',
(issue_hash,),
)
def has_seen(self, issue_text: str) -> bool: def has_seen(self, issue_text: str) -> bool:
issue_hash = self._compute_hash(issue_text) issue_hash = self._compute_hash(issue_text)
cursor = self.conn.execute('SELECT 1 FROM seen_issues WHERE issue_hash = ?', (issue_hash,)) cursor = self.conn.execute(
'SELECT 1 FROM seen_issues WHERE issue_hash = ?',
(issue_hash,),
)
return cursor.fetchone() is not None return cursor.fetchone() is not None
def _compute_hash(self, text: str) -> str: def _compute_hash(self, text: str) -> str:

View File

@ -1,16 +1,18 @@
import pytest
from aider_gitea.__main__ import generate_branch_name from aider_gitea.__main__ import generate_branch_name
def test_generate_branch_name_normal(): def test_generate_branch_name_normal():
# Normal case with alphanumeric title. # Normal case with alphanumeric title.
branch = generate_branch_name("123", "Some Issue Title") branch = generate_branch_name('123', 'Some Issue Title')
assert branch == "issue-123-some-issue-title" assert branch == 'issue-123-some-issue-title'
def test_generate_branch_name_special_characters(): def test_generate_branch_name_special_characters():
branch = generate_branch_name("45", "Issue @ Special!") branch = generate_branch_name('45', 'Issue @ Special!')
assert branch == "issue-45-issue-special" assert branch == 'issue-45-issue-special'
def test_generate_branch_name_numeric_title(): def test_generate_branch_name_numeric_title():
# Test where the title starts with numbers. # Test where the title starts with numbers.
branch = generate_branch_name("789", "123 Numbers Here") branch = generate_branch_name('789', '123 Numbers Here')
assert branch == "issue-789-123-numbers-here" assert branch == 'issue-789-123-numbers-here'

View File

@ -1,5 +1,3 @@
def test_init(): def test_init():
import aider_gitea #noqa: F401 import aider_gitea # noqa: F401
import aider_gitea.secrets #noqa: F401 import aider_gitea.secrets # noqa: F401

View File

@ -1,19 +1,18 @@
import sys
import unittest import unittest
from pathlib import Path
from aider_gitea.seen_issues_db import SeenIssuesDB from aider_gitea.seen_issues_db import SeenIssuesDB
class TestSeenIssuesDB(unittest.TestCase):
class TestSeenIssuesDB(unittest.TestCase):
def setUp(self): def setUp(self):
self.db = SeenIssuesDB(':memory:') self.db = SeenIssuesDB(':memory:')
def test_mark_and_check_seen_issue(self): def test_mark_and_check_seen_issue(self):
issue_text = "Test issue" issue_text = 'Test issue'
self.assertFalse(self.db.has_seen(issue_text)) self.assertFalse(self.db.has_seen(issue_text))
self.db.mark_as_seen(issue_text) self.db.mark_as_seen(issue_text)
self.assertTrue(self.db.has_seen(issue_text)) self.assertTrue(self.db.has_seen(issue_text))
def test_unseen_issue(self): def test_unseen_issue(self):
issue_text = "Unseen issue" issue_text = 'Unseen issue'
self.assertFalse(self.db.has_seen(issue_text)) self.assertFalse(self.db.has_seen(issue_text))