This commit is contained in:
Jon Michael Aanes 2025-04-13 18:06:44 +02:00
parent 8c75a10b3a
commit bdee056b67
7 changed files with 174 additions and 95 deletions

View File

@ -19,33 +19,37 @@ import re
from . import secrets from . import secrets
from .seen_issues_db import SeenIssuesDB from .seen_issues_db import SeenIssuesDB
def generate_branch_name(issue_number: str, issue_title: str) -> str: def generate_branch_name(issue_number: str, issue_title: str) -> str:
""" """
Create a branch name by sanitizing the issue title. Create a branch name by sanitizing the issue title.
Non-alphanumeric characters (except spaces) are removed, Non-alphanumeric characters (except spaces) are removed,
the text is lowercased, and spaces are replaced with dashes. the text is lowercased, and spaces are replaced with dashes.
""" """
sanitized = re.sub(r"[^0-9a-zA-Z ]+", "", issue_title) sanitized = re.sub(r'[^0-9a-zA-Z ]+', '', issue_title)
parts = ['issue', str(issue_number), *sanitized.lower().split()] parts = ['issue', str(issue_number), *sanitized.lower().split()]
return "-".join(parts) return '-'.join(parts)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def bash_cmd(*commands: str) -> str:
return "bash -c \""+';'.join(commands)+"\""
AIDER_TEST=bash_cmd( def bash_cmd(*commands: str) -> str:
"virtualenv venv", return 'bash -c "' + ';'.join(commands) + '"'
"source venv/bin/activate",
"pip install -e .",
"pytest test", AIDER_TEST = bash_cmd(
'virtualenv venv',
'source venv/bin/activate',
'pip install -e .',
'pytest test',
) )
AIDER_LINT=bash_cmd( AIDER_LINT = bash_cmd(
"ruff format", 'ruff format',
"ruff check --fix --ignore RUF022 --ignore PGH004", 'ruff check --fix --ignore RUF022 --ignore PGH004',
"ruff format", 'ruff format',
"ruff check --ignore RUF022 --ignore PGH004", 'ruff check --ignore RUF022 --ignore PGH004',
) )
@ -61,34 +65,41 @@ For code tasks:
3. Then, solve the issue by writing the required code. 3. Then, solve the issue by writing the required code.
""" """
def create_aider_command(issue: str) -> list[str]: def create_aider_command(issue: str) -> list[str]:
return [ return [
'aider', 'aider',
'--chat-language', 'english', '--chat-language',
'--test-cmd', AIDER_TEST, 'english',
'--lint-cmd', AIDER_LINT, '--test-cmd',
AIDER_TEST,
'--lint-cmd',
AIDER_LINT,
'--auto-test', '--auto-test',
'--auto-lint', '--auto-lint',
'--api-key', secrets.llm_api_key(), '--api-key',
'--read', 'CONVENTIONS.md', secrets.llm_api_key(),
'--message', LLM_MESSAGE_FORMAT.format(issue=issue), '--read',
'CONVENTIONS.md',
'--message',
LLM_MESSAGE_FORMAT.format(issue=issue),
'--yes-always', '--yes-always',
'--architect', '--architect',
] ]
class GiteaClient:
class GiteaClient:
def __init__(self, gitea_url: str, token: str) -> None: def __init__(self, gitea_url: str, token: str) -> None:
assert not gitea_url.endswith('/api/v1') assert not gitea_url.endswith('/api/v1')
self.gitea_url = gitea_url + '/api/v1' self.gitea_url = gitea_url + '/api/v1'
self.session = requests.Session() self.session = requests.Session()
self.session.headers["Content-Type"] = "application/json" self.session.headers['Content-Type'] = 'application/json'
if token: if token:
self.session.headers["Authorization"] = f"token {token}" self.session.headers['Authorization'] = f'token {token}'
def get_default_branch_sha(self, owner, repo, branch): def get_default_branch_sha(self, owner, repo, branch):
"""Retrieve the commit SHA of the default branch.""" """Retrieve the commit SHA of the default branch."""
url = f"{self.gitea_url}/repos/{owner}/{repo}/branches/{branch}" url = f'{self.gitea_url}/repos/{owner}/{repo}/branches/{branch}'
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
@ -96,87 +107,135 @@ class GiteaClient:
def create_branch(self, owner, repo, new_branch, sha): def create_branch(self, owner, repo, new_branch, sha):
"""Create a new branch from the provided SHA.""" """Create a new branch from the provided SHA."""
url = f"{self.gitea_url}/repos/{owner}/{repo}/git/refs" url = f'{self.gitea_url}/repos/{owner}/{repo}/git/refs'
json_data = {"ref": f"refs/heads/{new_branch}", "sha": sha} json_data = {'ref': f'refs/heads/{new_branch}', 'sha': sha}
response = self.session.post(url, json=json_data) response = self.session.post(url, json=json_data)
if response.status_code == 422: if response.status_code == 422:
logger.warning(f"Branch {new_branch} already exists.") logger.warning(f'Branch {new_branch} already exists.')
return False return False
response.raise_for_status() response.raise_for_status()
return True return True
def get_issues(self, owner, repo): def get_issues(self, owner, repo):
"""Download issues from the specified repository and filter those with the aider label.""" """Download issues from the specified repository and filter those with the aider label."""
url = f"{self.gitea_url}/repos/{owner}/{repo}/issues" url = f'{self.gitea_url}/repos/{owner}/{repo}/issues'
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
issues = response.json() issues = response.json()
# Filter to only include issues marked with the "aider" label. # Filter to only include issues marked with the "aider" label.
issues = [ issues = [
issue for issue in issues issue
if any(label.get("name") == "aider" for label in issue.get("labels", [])) for issue in issues
if any(label.get('name') == 'aider' for label in issue.get('labels', []))
] ]
return issues return issues
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="Download issues and create pull requests for a Gitea repository.") parser = argparse.ArgumentParser(
parser.add_argument("--gitea-url", required=True, help="Base URL for the Gitea instance, e.g., https://gitfub.space/api/v1") description='Download issues and create pull requests for a Gitea repository.'
parser.add_argument("--owner", required=True, help="Owner of the repository") )
parser.add_argument("--repo", required=True, help="Repository name") parser.add_argument(
parser.add_argument("--base-branch", default="main", help="Base branch to use for new branches (default: main)") '--gitea-url',
parser.add_argument("--daemon", action="store_true", help="Run in daemon mode to continuously monitor for new issues") required=True,
parser.add_argument("--interval", type=int, default=300, help="Interval in seconds between checks in daemon mode (default: 300)") help='Base URL for the Gitea instance, e.g., https://gitfub.space/api/v1',
)
parser.add_argument('--owner', required=True, help='Owner of the repository')
parser.add_argument('--repo', required=True, help='Repository name')
parser.add_argument(
'--base-branch',
default='main',
help='Base branch to use for new branches (default: main)',
)
parser.add_argument(
'--daemon',
action='store_true',
help='Run in daemon mode to continuously monitor for new issues',
)
parser.add_argument(
'--interval',
type=int,
default=300,
help='Interval in seconds between checks in daemon mode (default: 300)',
)
return parser.parse_args() return parser.parse_args()
def push_changes(cwd: Path, branch_name: str, issue_number: str, issue_title: str, issue_description: str, base_branch: str) -> None:
def push_changes(
cwd: Path,
branch_name: str,
issue_number: str,
issue_title: str,
issue_description: str,
base_branch: str,
) -> None:
cmd = [ cmd = [
"git", 'git',
"push", 'push',
"origin", 'origin',
f"HEAD:refs/for/{base_branch}", f'HEAD:refs/for/{base_branch}',
"-o", '-o',
f"topic={branch_name}", f'topic={branch_name}',
"-o", '-o',
f"title={issue_title}", f'title={issue_title}',
"-o", '-o',
f"description=This pull request resolves #{issue_number}", f'description=This pull request resolves #{issue_number}',
] ]
run_cmd(cmd, cwd) run_cmd(cmd, cwd)
def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool: def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool:
"""Check if there are any commits on the current branch that aren't in the base branch.""" """Check if there are any commits on the current branch that aren't in the base branch."""
try: try:
result = subprocess.run( result = subprocess.run(
["git", "log", f"{base_branch}..{current_branch}", "--oneline"], ['git', 'log', f'{base_branch}..{current_branch}', '--oneline'],
check=True, check=True,
cwd=cwd, cwd=cwd,
capture_output=True, capture_output=True,
text=True text=True,
) )
return bool(result.stdout.strip()) return bool(result.stdout.strip())
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
logger.exception(f"Failed to check commits on branch {current_branch}") logger.exception(f'Failed to check commits on branch {current_branch}')
return False return False
def run_cmd(cmd: list[str], cwd:Path|None=None) -> None:
def run_cmd(cmd: list[str], cwd: Path | None = None) -> None:
print(cmd) print(cmd)
subprocess.run(cmd, check=True, cwd=cwd) subprocess.run(cmd, check=True, cwd=cwd)
def process_issue(args, tmpdirname: Path, branch_name: str, issue_title: str, issue_description: str, issue_number: str): def process_issue(
repo_url = f"{args.gitea_url}:{args.owner}/{args.repo}.git".replace('https://', 'git@') args,
run_cmd(["git", "clone", repo_url, tmpdirname]) tmpdirname: Path,
branch_name: str,
issue_title: str,
issue_description: str,
issue_number: str,
):
repo_url = f'{args.gitea_url}:{args.owner}/{args.repo}.git'.replace(
'https://', 'git@'
)
run_cmd(['git', 'clone', repo_url, tmpdirname])
run_cmd(['bash', '-c', AIDER_TEST], tmpdirname) run_cmd(['bash', '-c', AIDER_TEST], tmpdirname)
run_cmd(["git", "checkout", args.base_branch], tmpdirname) run_cmd(['git', 'checkout', args.base_branch], tmpdirname)
run_cmd(["git", "checkout", "-b", branch_name], tmpdirname) run_cmd(['git', 'checkout', '-b', branch_name], tmpdirname)
run_cmd(create_aider_command(f'# {issue_title}\n{issue_description}'), tmpdirname) run_cmd(create_aider_command(f'# {issue_title}\n{issue_description}'), tmpdirname)
run_cmd(["git", "add", "."], tmpdirname) run_cmd(['git', 'add', '.'], tmpdirname)
# Check if there are any commits on the branch before pushing # Check if there are any commits on the branch before pushing
if has_commits_on_branch(tmpdirname, args.base_branch, branch_name): if has_commits_on_branch(tmpdirname, args.base_branch, branch_name):
push_changes(tmpdirname, branch_name, issue_number, issue_title, issue_description, args.base_branch) push_changes(
tmpdirname,
branch_name,
issue_number,
issue_title,
issue_description,
args.base_branch,
)
else: else:
logger.info(f"No commits made on branch {branch_name}, skipping push") logger.info(f'No commits made on branch {branch_name}, skipping push')
def handle_issues(args, client, seen_issues_db): def handle_issues(args, client, seen_issues_db):
"""Process all open issues with the 'aider' label.""" """Process all open issues with the 'aider' label."""
@ -187,28 +246,36 @@ def handle_issues(args, client, seen_issues_db):
sys.exit(1) sys.exit(1)
if not issues: if not issues:
logger.info("No issues found.") logger.info('No issues found.')
return return
for issue in issues: for issue in issues:
issue_number = issue.get("number") issue_number = issue.get('number')
issue_description = issue.get("body", "") issue_description = issue.get('body', '')
title = issue.get("title", f"Issue {issue_number}") title = issue.get('title', f'Issue {issue_number}')
issue_text = f"{title}\n{issue_description}" issue_text = f'{title}\n{issue_description}'
if seen_issues_db.has_seen(issue_text): if seen_issues_db.has_seen(issue_text):
logger.info(f"Skipping already processed issue #{issue_number}: {title}") logger.info(f'Skipping already processed issue #{issue_number}: {title}')
continue continue
branch_name = generate_branch_name(issue_number, title) branch_name = generate_branch_name(issue_number, title)
try: try:
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
process_issue(args, Path(tmpdirname), branch_name, title, issue_description, issue_number) process_issue(
args,
Path(tmpdirname),
branch_name,
title,
issue_description,
issue_number,
)
except Exception: except Exception:
logger.exception('Error processing issue') logger.exception('Error processing issue')
sys.exit(1) sys.exit(1)
seen_issues_db.mark_as_seen(issue_text) seen_issues_db.mark_as_seen(issue_text)
def main(): def main():
logging.basicConfig(level='INFO') logging.basicConfig(level='INFO')
args = parse_args() args = parse_args()
@ -217,18 +284,21 @@ def main():
client = GiteaClient(args.gitea_url, secrets.gitea_token()) client = GiteaClient(args.gitea_url, secrets.gitea_token())
if args.daemon: if args.daemon:
logger.info(f"Starting daemon mode, checking for new issues every {args.interval} seconds") logger.info(
f'Starting daemon mode, checking for new issues every {args.interval} seconds'
)
try: try:
while True: while True:
logger.info("Checking for new issues...") logger.info('Checking for new issues...')
handle_issues(args, client, seen_issues_db) handle_issues(args, client, seen_issues_db)
logger.info(f"Sleeping for {args.interval} seconds...") logger.info(f'Sleeping for {args.interval} seconds...')
time.sleep(args.interval) time.sleep(args.interval)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("Daemon stopped by user") logger.info('Daemon stopped by user')
else: else:
# One-off run # One-off run
handle_issues(args, client, seen_issues_db) handle_issues(args, client, seen_issues_db)
if __name__ == "__main__":
if __name__ == '__main__':
main() main()

View File

@ -2,8 +2,10 @@ import secret_loader
SECRETS = secret_loader.SecretLoader() SECRETS = secret_loader.SecretLoader()
def llm_api_key(): def llm_api_key():
return SECRETS.load_or_fail('LLM_API_KEY') return SECRETS.load_or_fail('LLM_API_KEY')
def gitea_token(): def gitea_token():
return SECRETS.load_or_fail('GITEA_TOKEN') return SECRETS.load_or_fail('GITEA_TOKEN')

View File

@ -3,6 +3,7 @@ from hashlib import sha256
DEFAULT_DB_PATH = 'output/seen_issues.db' DEFAULT_DB_PATH = 'output/seen_issues.db'
class SeenIssuesDB: class SeenIssuesDB:
def __init__(self, db_path=DEFAULT_DB_PATH): def __init__(self, db_path=DEFAULT_DB_PATH):
self.conn = sqlite3.connect(db_path) self.conn = sqlite3.connect(db_path)
@ -10,20 +11,25 @@ class SeenIssuesDB:
def _create_table(self): def _create_table(self):
with self.conn: with self.conn:
self.conn.execute(''' self.conn.execute("""
CREATE TABLE IF NOT EXISTS seen_issues ( CREATE TABLE IF NOT EXISTS seen_issues (
issue_hash TEXT PRIMARY KEY issue_hash TEXT PRIMARY KEY
) )
''') """)
def mark_as_seen(self, issue_text: str): def mark_as_seen(self, issue_text: str):
issue_hash = self._compute_hash(issue_text) issue_hash = self._compute_hash(issue_text)
with self.conn: with self.conn:
self.conn.execute('INSERT OR IGNORE INTO seen_issues (issue_hash) VALUES (?)', (issue_hash,)) self.conn.execute(
'INSERT OR IGNORE INTO seen_issues (issue_hash) VALUES (?)',
(issue_hash,),
)
def has_seen(self, issue_text: str) -> bool: def has_seen(self, issue_text: str) -> bool:
issue_hash = self._compute_hash(issue_text) issue_hash = self._compute_hash(issue_text)
cursor = self.conn.execute('SELECT 1 FROM seen_issues WHERE issue_hash = ?', (issue_hash,)) cursor = self.conn.execute(
'SELECT 1 FROM seen_issues WHERE issue_hash = ?', (issue_hash,)
)
return cursor.fetchone() is not None return cursor.fetchone() is not None
def _compute_hash(self, text: str) -> str: def _compute_hash(self, text: str) -> str:

View File

@ -1,16 +1,19 @@
import pytest import pytest
from aider_gitea.__main__ import generate_branch_name from aider_gitea.__main__ import generate_branch_name
def test_generate_branch_name_normal(): def test_generate_branch_name_normal():
# Normal case with alphanumeric title. # Normal case with alphanumeric title.
branch = generate_branch_name("123", "Some Issue Title") branch = generate_branch_name('123', 'Some Issue Title')
assert branch == "issue-123-some-issue-title" assert branch == 'issue-123-some-issue-title'
def test_generate_branch_name_special_characters(): def test_generate_branch_name_special_characters():
branch = generate_branch_name("45", "Issue @ Special!") branch = generate_branch_name('45', 'Issue @ Special!')
assert branch == "issue-45-issue-special" assert branch == 'issue-45-issue-special'
def test_generate_branch_name_numeric_title(): def test_generate_branch_name_numeric_title():
# Test where the title starts with numbers. # Test where the title starts with numbers.
branch = generate_branch_name("789", "123 Numbers Here") branch = generate_branch_name('789', '123 Numbers Here')
assert branch == "issue-789-123-numbers-here" assert branch == 'issue-789-123-numbers-here'

View File

@ -1,5 +1,3 @@
def test_init(): def test_init():
import aider_gitea #noqa: F401 import aider_gitea # noqa: F401
import aider_gitea.secrets #noqa: F401 import aider_gitea.secrets # noqa: F401

View File

@ -3,17 +3,17 @@ import unittest
from pathlib import Path from pathlib import Path
from aider_gitea.seen_issues_db import SeenIssuesDB from aider_gitea.seen_issues_db import SeenIssuesDB
class TestSeenIssuesDB(unittest.TestCase):
class TestSeenIssuesDB(unittest.TestCase):
def setUp(self): def setUp(self):
self.db = SeenIssuesDB(':memory:') self.db = SeenIssuesDB(':memory:')
def test_mark_and_check_seen_issue(self): def test_mark_and_check_seen_issue(self):
issue_text = "Test issue" issue_text = 'Test issue'
self.assertFalse(self.db.has_seen(issue_text)) self.assertFalse(self.db.has_seen(issue_text))
self.db.mark_as_seen(issue_text) self.db.mark_as_seen(issue_text)
self.assertTrue(self.db.has_seen(issue_text)) self.assertTrue(self.db.has_seen(issue_text))
def test_unseen_issue(self): def test_unseen_issue(self):
issue_text = "Unseen issue" issue_text = 'Unseen issue'
self.assertFalse(self.db.has_seen(issue_text)) self.assertFalse(self.db.has_seen(issue_text))