Resolving code debt
All checks were successful
Run Python tests (through Pytest) / Test (push) Successful in 25s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 23s

This commit is contained in:
Jon Michael Aanes 2025-04-16 00:03:39 +02:00
parent 78d2927b44
commit 7083ca48c0
4 changed files with 104 additions and 118 deletions

View File

@ -70,6 +70,7 @@ The tool uses environment variables for sensitive information:
``` ```
""" """
import dataclasses
import logging import logging
import re import re
import subprocess import subprocess
@ -83,6 +84,27 @@ from ._version import __version__ # noqa: F401
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class RepositoryConfig:
gitea_url: str
owner: str
repo: str
base_branch: str
def repo_url(self) -> str:
return f'{self.gitea_url}:{self.owner}/{self.repo}.git'.replace(
'https://',
'git@',
)
@dataclasses.dataclass(frozen=True)
class IssueResolution:
success: bool
pull_request_url: str | None = None
pull_request_id: str | None = None
def generate_branch_name(issue_number: str, issue_title: str) -> str: def generate_branch_name(issue_number: str, issue_title: str) -> str:
"""Create a branch name by sanitizing the issue title. """Create a branch name by sanitizing the issue title.
@ -197,29 +219,29 @@ def get_commit_messages(cwd: Path, base_branch: str, current_branch: str) -> lis
capture_output=True, capture_output=True,
text=True, text=True,
) )
return reversed(result.stdout.strip().split('\n')) return list(reversed(result.stdout.strip().split('\n')))
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
logger.exception(f'Failed to get commit messages on branch {current_branch}') logger.exception(f'Failed to get commit messages on branch {current_branch}')
return '' return []
def push_changes( def push_changes(
repository_config: RepositoryConfig,
cwd: Path, cwd: Path,
branch_name: str, branch_name: str,
issue_number: str, issue_number: str,
issue_title: str, issue_title: str,
base_branch: str,
gitea_client, gitea_client,
owner: str, ) -> IssueResolution:
repo: str,
) -> tuple[bool, str, str]:
# Check if there are any commits on the branch before pushing # Check if there are any commits on the branch before pushing
if not has_commits_on_branch(cwd, base_branch, branch_name): if not has_commits_on_branch(cwd, repository_config.base_branch, branch_name):
logger.info('No commits made on branch %s, skipping push', branch_name) logger.info('No commits made on branch %s, skipping push', branch_name)
return False, None, None return IssueResolution(False)
# Get commit messages for PR description # Get commit messages for PR description
commit_messages = get_commit_messages(cwd, base_branch, branch_name) commit_messages = get_commit_messages(
cwd, repository_config.base_branch, branch_name,
)
description = f'This pull request resolves #{issue_number}\n\n' description = f'This pull request resolves #{issue_number}\n\n'
if commit_messages: if commit_messages:
@ -233,17 +255,19 @@ def push_changes(
# Then create the PR with the aider label # Then create the PR with the aider label
pr_response = gitea_client.create_pull_request( pr_response = gitea_client.create_pull_request(
owner=owner, owner=repository_config.owner,
repo=repo, repo=repository_config.repo,
title=issue_title, title=issue_title,
body=description, body=description,
head=branch_name, head=branch_name,
base=base_branch, base=repository_config.base_branch,
labels=['aider'], labels=['aider'],
) )
# Extract PR number and URL if available # Extract PR number and URL if available
return True, str(pr_response.get('number')), pr_response.get('html_url') return IssueResolution(
True, str(pr_response.get('number')), pr_response.get('html_url'),
)
def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool: def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool:
@ -290,26 +314,20 @@ SKIP_AIDER = False
def solve_issue_in_repository( def solve_issue_in_repository(
args, repository_config: RepositoryConfig,
tmpdirname: Path, tmpdirname: Path,
branch_name: str, branch_name: str,
issue_title: str, issue_title: str,
issue_description: str, issue_description: str,
issue_number: str, issue_number: str,
gitea_client, gitea_client,
seen_issues_db, ) -> IssueResolution:
) -> bool:
logger.info('### %s #####', issue_title) logger.info('### %s #####', issue_title)
repo_url = f'{args.gitea_url}:{args.owner}/{args.repo}.git'.replace(
'https://',
'git@',
)
# Setup repository # Setup repository
run_cmd(['git', 'clone', repo_url, tmpdirname]) run_cmd(['git', 'clone', repository_config.repo_url(), tmpdirname])
run_cmd(['bash', '-c', AIDER_TEST], tmpdirname) run_cmd(['bash', '-c', AIDER_TEST], tmpdirname)
run_cmd(['git', 'checkout', args.base_branch], tmpdirname) run_cmd(['git', 'checkout', repository_config.base_branch], tmpdirname)
run_cmd(['git', 'checkout', '-b', branch_name], tmpdirname) run_cmd(['git', 'checkout', '-b', branch_name], tmpdirname)
# Run initial ruff pass before aider # Run initial ruff pass before aider
@ -340,7 +358,7 @@ def solve_issue_in_repository(
succeeded = True succeeded = True
if not succeeded: if not succeeded:
logger.error('Aider invocation failed for issue #%s', issue_number) logger.error('Aider invocation failed for issue #%s', issue_number)
return False return IssueResolution(False)
# Auto-fix standard code quality stuff after aider # Auto-fix standard code quality stuff after aider
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], tmpdirname, check=False) run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], tmpdirname, check=False)
@ -362,74 +380,69 @@ def solve_issue_in_repository(
'Aider did not make any changes beyond the initial ruff pass for issue #%s', 'Aider did not make any changes beyond the initial ruff pass for issue #%s',
issue_number, issue_number,
) )
return False return IssueResolution(False)
# Create issue_text for database tracking
issue_text = f'{issue_title}\n{issue_description}'
# Push changes # Push changes
success, pr_number, pr_url = push_changes( return push_changes(
repository_config,
tmpdirname, tmpdirname,
branch_name, branch_name,
issue_number, issue_number,
issue_title, issue_title,
args.base_branch,
gitea_client, gitea_client,
args.owner,
args.repo,
) )
# Store PR information in the database if available
if success:
seen_issues_db.update_pr_info(issue_text, pr_number, pr_url)
logger.info(
'Stored PR #%s information for issue #%s',
pr_number,
issue_number,
)
return success def solve_issues_in_repository(
repository_config: RepositoryConfig, client, seen_issues_db,
):
def handle_issues(args, client, seen_issues_db):
"""Process all open issues with the 'aider' label. """Process all open issues with the 'aider' label.
Args: Args:
args: Command line arguments. repository_config: Command line arguments.
client: The Gitea client instance. client: The Gitea client instance.
seen_issues_db: Database of previously processed issues. seen_issues_db: Database of previously processed issues.
""" """
try: try:
issues = client.get_issues(args.owner, args.repo) issues = client.get_issues(repository_config.owner, repository_config.repo)
except Exception: except Exception:
logger.exception('Failed to retrieve issues') logger.exception('Failed to retrieve issues')
sys.exit(1) sys.exit(1)
if not issues: if not issues:
logger.info('No issues found for %s', args.repo) logger.info('No issues found for %s', repository_config.repo)
return return
for issue in issues: for issue in issues:
issue_url = issue.get('web_url')
issue_number = issue.get('number') issue_number = issue.get('number')
issue_description = issue.get('body', '') issue_description = issue.get('body', '')
title = issue.get('title', f'Issue {issue_number}') title = issue.get('title', f'Issue {issue_number}')
issue_text = f'{title}\n{issue_description}' if seen_issues_db.has_seen(issue_url):
if seen_issues_db.has_seen(issue_text):
logger.info('Skipping already processed issue #%s: %s', issue_number, title) logger.info('Skipping already processed issue #%s: %s', issue_number, title)
continue continue
branch_name = generate_branch_name(issue_number, title) branch_name = generate_branch_name(issue_number, title)
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
solved = solve_issue_in_repository( issue_resolution = solve_issue_in_repository(
args, repository_config,
Path(tmpdirname), Path(tmpdirname),
branch_name, branch_name,
title, title,
issue_description, issue_description,
issue_number, issue_number,
client, client,
seen_issues_db,
) )
if solved: if issue_resolution.success:
seen_issues_db.mark_as_seen(issue_text, str(issue_number)) seen_issues_db.mark_as_seen(issue_url, str(issue_number))
seen_issues_db.update_pr_info(
issue_url,
issue_resolution.pull_request_id,
issue_resolution.pull_request_url,
)
logger.info(
'Stored PR #%s information for issue #%s',
issue_resolution.pull_request_id,
issue_number,
)

View File

@ -7,23 +7,14 @@ It assumes that the default branch (default "main") exists and that you have a v
import argparse import argparse
import logging import logging
import time import time
from dataclasses import dataclass
from . import handle_issues, secrets from . import RepositoryConfig, secrets, solve_issues_in_repository
from .gitea_client import GiteaClient from .gitea_client import GiteaClient
from .seen_issues_db import SeenIssuesDB from .seen_issues_db import SeenIssuesDB
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass
class AiderArgs:
gitea_url: str
owner: str
repo: str
base_branch: str
def parse_args(): def parse_args():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Download issues and create pull requests for a Gitea repository.', description='Download issues and create pull requests for a Gitea repository.',
@ -72,13 +63,13 @@ def main():
while True: while True:
logger.info('Checking for new issues...') logger.info('Checking for new issues...')
for repo in repositories: for repo in repositories:
aider_args = AiderArgs( repository_config = RepositoryConfig(
gitea_url=args.gitea_url, gitea_url=args.gitea_url,
owner=args.owner, owner=args.owner,
repo=repo, repo=repo,
base_branch=args.base_branch, base_branch=args.base_branch,
) )
handle_issues(aider_args, client, seen_issues_db) solve_issues_in_repository(repository_config, client, seen_issues_db)
del repo del repo
if not args.daemon: if not args.daemon:
break break

View File

@ -6,7 +6,6 @@ store information about seen issues and their associated pull requests for effic
""" """
import sqlite3 import sqlite3
from hashlib import sha256
DEFAULT_DB_PATH = 'output/seen_issues.db' DEFAULT_DB_PATH = 'output/seen_issues.db'
@ -40,7 +39,7 @@ class SeenIssuesDB:
with self.conn: with self.conn:
self.conn.execute(""" self.conn.execute("""
CREATE TABLE IF NOT EXISTS seen_issues ( CREATE TABLE IF NOT EXISTS seen_issues (
issue_hash TEXT PRIMARY KEY, issue_url TEXT PRIMARY KEY,
issue_number TEXT, issue_number TEXT,
pr_number TEXT, pr_number TEXT,
pr_url TEXT, pr_url TEXT,
@ -50,10 +49,10 @@ class SeenIssuesDB:
def mark_as_seen( def mark_as_seen(
self, self,
issue_text: str, issue_url: str,
issue_number: str = None, issue_number: str | None = None,
pr_number: str = None, pr_number: str | None = None,
pr_url: str = None, pr_url: str | None = None,
): ):
"""Mark an issue as seen in the database. """Mark an issue as seen in the database.
@ -61,79 +60,64 @@ class SeenIssuesDB:
If the issue has already been marked as seen, this operation has no effect. If the issue has already been marked as seen, this operation has no effect.
Args: Args:
issue_text: The text content of the issue to mark as seen. issue_url: The text content of the issue to mark as seen.
issue_number: The issue number. issue_number: The issue number.
pr_number: The pull request number associated with this issue. pr_number: The pull request number associated with this issue.
pr_url: The URL of the pull request associated with this issue. pr_url: The URL of the pull request associated with this issue.
""" """
issue_hash = self._compute_hash(issue_text)
with self.conn: with self.conn:
self.conn.execute( self.conn.execute(
'INSERT OR IGNORE INTO seen_issues (issue_hash, issue_number, pr_number, pr_url) VALUES (?, ?, ?, ?)', 'INSERT OR IGNORE INTO seen_issues (issue_url, issue_number, pr_number, pr_url) VALUES (?, ?, ?, ?)',
(issue_hash, issue_number, pr_number, pr_url), (issue_url, issue_number, pr_number, pr_url),
) )
def has_seen(self, issue_text: str) -> bool: def has_seen(self, issue_url: str) -> bool:
"""Check if an issue has been seen before. """Check if an issue has been seen before.
Computes a hash of the issue text and checks if it exists in the database. Computes a hash of the issue text and checks if it exists in the database.
Args: Args:
issue_text: The text content of the issue to check. issue_url: The text content of the issue to check.
Returns: Returns:
True if the issue has been seen before, False otherwise. True if the issue has been seen before, False otherwise.
""" """
issue_hash = self._compute_hash(issue_text)
cursor = self.conn.execute( cursor = self.conn.execute(
'SELECT 1 FROM seen_issues WHERE issue_hash = ?', 'SELECT 1 FROM seen_issues WHERE issue_url = ?',
(issue_hash,), (issue_url,),
) )
return cursor.fetchone() is not None return cursor.fetchone() is not None
def get_pr_info(self, issue_text: str) -> tuple[str, str] | None: def get_pr_info(self, issue_url: str) -> tuple[str, str] | None:
"""Get pull request information for an issue. """Get pull request information for an issue.
Args: Args:
issue_text: The text content of the issue to check. issue_url: The text content of the issue to check.
Returns: Returns:
A tuple containing (pr_number, pr_url) if found, None otherwise. A tuple containing (pr_number, pr_url) if found, None otherwise.
""" """
issue_hash = self._compute_hash(issue_text)
cursor = self.conn.execute( cursor = self.conn.execute(
'SELECT pr_number, pr_url FROM seen_issues WHERE issue_hash = ?', 'SELECT pr_number, pr_url FROM seen_issues WHERE issue_url = ?',
(issue_hash,), (issue_url,),
) )
result = cursor.fetchone() result = cursor.fetchone()
return result if result else None return result if result else None
def update_pr_info(self, issue_text: str, pr_number: str, pr_url: str) -> bool: def update_pr_info(self, issue_url: str, pr_number: str, pr_url: str) -> bool:
"""Update pull request information for an existing issue. """Update pull request information for an existing issue.
Args: Args:
issue_text: The text content of the issue to update. issue_url: The text content of the issue to update.
pr_number: The pull request number. pr_number: The pull request number.
pr_url: The URL of the pull request. pr_url: The URL of the pull request.
Returns: Returns:
True if the update was successful, False if the issue wasn't found. True if the update was successful, False if the issue wasn't found.
""" """
issue_hash = self._compute_hash(issue_text)
with self.conn: with self.conn:
cursor = self.conn.execute( cursor = self.conn.execute(
'UPDATE seen_issues SET pr_number = ?, pr_url = ? WHERE issue_hash = ?', 'UPDATE seen_issues SET pr_number = ?, pr_url = ? WHERE issue_url = ?',
(pr_number, pr_url, issue_hash), (pr_number, pr_url, issue_url),
) )
return cursor.rowcount > 0 return cursor.rowcount > 0
def _compute_hash(self, text: str) -> str:
"""Compute a SHA-256 hash of the given text.
Args:
text: The text to hash.
Returns:
A hexadecimal string representation of the hash.
"""
return sha256(text.encode('utf-8')).hexdigest()

View File

@ -1,19 +1,19 @@
from pathlib import Path from pathlib import Path
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from aider_gitea import solve_issue_in_repository from aider_gitea import IssueResolution, RepositoryConfig, solve_issue_in_repository
REPOSITORY_CONFIG = RepositoryConfig(
gitea_url='https://gitea.example.com',
owner='test-owner',
repo='test-repo',
base_branch='main',
)
class TestSolveIssueInRepository: class TestSolveIssueInRepository:
def setup_method(self): def setup_method(self):
self.args = MagicMock()
self.args.gitea_url = 'https://gitea.example.com'
self.args.owner = 'test-owner'
self.args.repo = 'test-repo'
self.args.base_branch = 'main'
self.gitea_client = MagicMock() self.gitea_client = MagicMock()
self.seen_issues_db = MagicMock()
self.tmpdirname = Path('/tmp/test-repo') self.tmpdirname = Path('/tmp/test-repo')
self.branch_name = 'issue-123-test-branch' self.branch_name = 'issue-123-test-branch'
self.issue_title = 'Test Issue' self.issue_title = 'Test Issue'
@ -33,7 +33,7 @@ class TestSolveIssueInRepository:
): ):
# Setup mocks # Setup mocks
mock_run_cmd.return_value = True mock_run_cmd.return_value = True
mock_push_changes.return_value = ( mock_push_changes.return_value = IssueResolution(
True, True,
'456', '456',
'https://gitea.example.com/test-owner/test-repo/pulls/456', 'https://gitea.example.com/test-owner/test-repo/pulls/456',
@ -50,18 +50,17 @@ class TestSolveIssueInRepository:
# Call the function # Call the function
result = solve_issue_in_repository( result = solve_issue_in_repository(
self.args, REPOSITORY_CONFIG,
self.tmpdirname, self.tmpdirname,
self.branch_name, self.branch_name,
self.issue_title, self.issue_title,
self.issue_description, self.issue_description,
self.issue_number, self.issue_number,
self.gitea_client, self.gitea_client,
self.seen_issues_db,
) )
# Verify results # Verify results
assert result is True assert result.success is True
assert mock_run_cmd.call_count >= 8 # Verify all expected commands were run assert mock_run_cmd.call_count >= 8 # Verify all expected commands were run
mock_push_changes.assert_called_once() mock_push_changes.assert_called_once()
@ -78,7 +77,7 @@ class TestSolveIssueInRepository:
): ):
# Setup mocks # Setup mocks
mock_run_cmd.return_value = True mock_run_cmd.return_value = True
mock_push_changes.return_value = (False, None, None) mock_push_changes.return_value = IssueResolution(False, None, None)
# Mock subprocess.run to return same commit hash and no file changes # Mock subprocess.run to return same commit hash and no file changes
mock_subprocess_run.side_effect = [ mock_subprocess_run.side_effect = [
@ -88,16 +87,15 @@ class TestSolveIssueInRepository:
# Call the function # Call the function
result = solve_issue_in_repository( result = solve_issue_in_repository(
self.args, REPOSITORY_CONFIG,
self.tmpdirname, self.tmpdirname,
self.branch_name, self.branch_name,
self.issue_title, self.issue_title,
self.issue_description, self.issue_description,
self.issue_number, self.issue_number,
self.gitea_client, self.gitea_client,
self.seen_issues_db,
) )
# Verify results # Verify results
assert result is False assert result.success is False
assert mock_push_changes.call_count == 0 # push_changes should not be called assert mock_push_changes.call_count == 0 # push_changes should not be called