Compare commits

..

3 Commits

Author SHA1 Message Date
e3a39f98b8 Ruff after aider
All checks were successful
Run Python tests (through Pytest) / Test (push) Successful in 25s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 22s
2025-04-15 23:42:56 +02:00
d898d91bfa feat: add cost documentation to pull request description 2025-04-15 23:42:52 +02:00
54476b5584 Initial ruff pass 2025-04-15 23:42:29 +02:00
6 changed files with 100 additions and 284 deletions

View File

@ -70,7 +70,6 @@ The tool uses environment variables for sensitive information:
```
"""
import dataclasses
import logging
import re
import subprocess
@ -84,27 +83,6 @@ from ._version import __version__ # noqa: F401
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class RepositoryConfig:
gitea_url: str
owner: str
repo: str
base_branch: str
def repo_url(self) -> str:
return f'{self.gitea_url}:{self.owner}/{self.repo}.git'.replace(
'https://',
'git@',
)
@dataclasses.dataclass(frozen=True)
class IssueResolution:
success: bool
pull_request_url: str | None = None
pull_request_id: str | None = None
def generate_branch_name(issue_number: str, issue_title: str) -> str:
"""Create a branch name by sanitizing the issue title.
@ -219,29 +197,29 @@ def get_commit_messages(cwd: Path, base_branch: str, current_branch: str) -> lis
capture_output=True,
text=True,
)
return list(reversed(result.stdout.strip().split('\n')))
return reversed(result.stdout.strip().split('\n'))
except subprocess.CalledProcessError:
logger.exception(f'Failed to get commit messages on branch {current_branch}')
return []
return ''
def push_changes(
repository_config: RepositoryConfig,
cwd: Path,
branch_name: str,
issue_number: str,
issue_title: str,
base_branch: str,
gitea_client,
) -> IssueResolution:
owner: str,
repo: str,
) -> bool:
# Check if there are any commits on the branch before pushing
if not has_commits_on_branch(cwd, repository_config.base_branch, branch_name):
if not has_commits_on_branch(cwd, base_branch, branch_name):
logger.info('No commits made on branch %s, skipping push', branch_name)
return IssueResolution(False)
return False
# Get commit messages for PR description
commit_messages = get_commit_messages(
cwd, repository_config.base_branch, branch_name,
)
commit_messages = get_commit_messages(cwd, base_branch, branch_name)
description = f'This pull request resolves #{issue_number}\n\n'
if commit_messages:
@ -249,25 +227,24 @@ def push_changes(
for message in commit_messages:
description += f'- {message}\n'
# Add trailing line documenting costs
description += '\n## Costs\nThis task was solved using AI assistance.'
# First push the branch without creating a PR
cmd = ['git', 'push', 'origin', branch_name, '--force']
run_cmd(cmd, cwd)
# Then create the PR with the aider label
pr_response = gitea_client.create_pull_request(
owner=repository_config.owner,
repo=repository_config.repo,
gitea_client.create_pull_request(
owner=owner,
repo=repo,
title=issue_title,
body=description,
head=branch_name,
base=repository_config.base_branch,
base=base_branch,
labels=['aider'],
)
# Extract PR number and URL if available
return IssueResolution(
True, str(pr_response.get('number')), pr_response.get('html_url'),
)
return True
def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> bool:
@ -282,9 +259,15 @@ def has_commits_on_branch(cwd: Path, base_branch: str, current_branch: str) -> b
True if there are commits on the current branch not in the base branch, False otherwise.
"""
try:
commit_messages = get_commit_messages(cwd, base_branch, current_branch)
return bool(list(commit_messages))
except Exception:
result = subprocess.run(
['git', 'log', f'{base_branch}..{current_branch}', '--oneline'],
check=True,
cwd=cwd,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
except subprocess.CalledProcessError:
logger.exception('Failed to check commits on branch %s', current_branch)
return False
@ -308,20 +291,25 @@ SKIP_AIDER = False
def solve_issue_in_repository(
repository_config: RepositoryConfig,
args,
tmpdirname: Path,
branch_name: str,
issue_title: str,
issue_description: str,
issue_number: str,
gitea_client,
) -> IssueResolution:
gitea_client=None,
) -> bool:
logger.info('### %s #####', issue_title)
repo_url = f'{args.gitea_url}:{args.owner}/{args.repo}.git'.replace(
'https://',
'git@',
)
# Setup repository
run_cmd(['git', 'clone', repository_config.repo_url(), tmpdirname])
run_cmd(['git', 'clone', repo_url, tmpdirname])
run_cmd(['bash', '-c', AIDER_TEST], tmpdirname)
run_cmd(['git', 'checkout', repository_config.base_branch], tmpdirname)
run_cmd(['git', 'checkout', args.base_branch], tmpdirname)
run_cmd(['git', 'checkout', '-b', branch_name], tmpdirname)
# Run initial ruff pass before aider
@ -352,7 +340,7 @@ def solve_issue_in_repository(
succeeded = True
if not succeeded:
logger.error('Aider invocation failed for issue #%s', issue_number)
return IssueResolution(False)
return False
# Auto-fix standard code quality stuff after aider
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], tmpdirname, check=False)
@ -374,52 +362,52 @@ def solve_issue_in_repository(
'Aider did not make any changes beyond the initial ruff pass for issue #%s',
issue_number,
)
return IssueResolution(False)
return False
# Push changes
return push_changes(
repository_config,
tmpdirname,
branch_name,
issue_number,
issue_title,
args.base_branch,
gitea_client,
args.owner,
args.repo,
)
def solve_issues_in_repository(
repository_config: RepositoryConfig, client, seen_issues_db,
):
def handle_issues(args, client, seen_issues_db):
"""Process all open issues with the 'aider' label.
Args:
repository_config: Command line arguments.
args: Command line arguments.
client: The Gitea client instance.
seen_issues_db: Database of previously processed issues.
"""
try:
issues = client.get_issues(repository_config.owner, repository_config.repo)
issues = client.get_issues(args.owner, args.repo)
except Exception:
logger.exception('Failed to retrieve issues')
sys.exit(1)
if not issues:
logger.info('No issues found for %s', repository_config.repo)
logger.info('No issues found for %s', args.repo)
return
for issue in issues:
issue_url = issue.get('web_url')
issue_number = issue.get('number')
issue_description = issue.get('body', '')
title = issue.get('title', f'Issue {issue_number}')
if seen_issues_db.has_seen(issue_url):
issue_text = f'{title}\n{issue_description}'
if seen_issues_db.has_seen(issue_text):
logger.info('Skipping already processed issue #%s: %s', issue_number, title)
continue
branch_name = generate_branch_name(issue_number, title)
with tempfile.TemporaryDirectory() as tmpdirname:
issue_resolution = solve_issue_in_repository(
repository_config,
solved = solve_issue_in_repository(
args,
Path(tmpdirname),
branch_name,
title,
@ -428,15 +416,5 @@ def solve_issues_in_repository(
client,
)
if issue_resolution.success:
seen_issues_db.mark_as_seen(issue_url, str(issue_number))
seen_issues_db.update_pr_info(
issue_url,
issue_resolution.pull_request_id,
issue_resolution.pull_request_url,
)
logger.info(
'Stored PR #%s information for issue #%s',
issue_resolution.pull_request_id,
issue_number,
)
if solved:
seen_issues_db.mark_as_seen(issue_text)

View File

@ -7,14 +7,23 @@ It assumes that the default branch (default "main") exists and that you have a v
import argparse
import logging
import time
from dataclasses import dataclass
from . import RepositoryConfig, secrets, solve_issues_in_repository
from . import handle_issues, secrets
from .gitea_client import GiteaClient
from .seen_issues_db import SeenIssuesDB
logger = logging.getLogger(__name__)
@dataclass
class AiderArgs:
gitea_url: str
owner: str
repo: str
base_branch: str
def parse_args():
parser = argparse.ArgumentParser(
description='Download issues and create pull requests for a Gitea repository.',
@ -63,13 +72,13 @@ def main():
while True:
logger.info('Checking for new issues...')
for repo in repositories:
repository_config = RepositoryConfig(
aider_args = AiderArgs(
gitea_url=args.gitea_url,
owner=args.owner,
repo=repo,
base_branch=args.base_branch,
)
solve_issues_in_repository(repository_config, client, seen_issues_db)
handle_issues(aider_args, client, seen_issues_db)
del repo
if not args.daemon:
break

View File

@ -1,22 +1,22 @@
"""Database module for tracking previously processed issues and pull requests.
"""Database module for tracking previously processed issues.
This module provides functionality to track which issues have already been processed
by the system to avoid duplicate processing. It uses a simple SQLite database to
store information about seen issues and their associated pull requests for efficient lookup.
store hashes of seen issues for efficient lookup.
"""
import sqlite3
from hashlib import sha256
DEFAULT_DB_PATH = 'output/seen_issues.db'
class SeenIssuesDB:
"""Database handler for tracking processed issues and pull requests.
"""Database handler for tracking processed issues.
This class manages a SQLite database that stores information about issues that have
already been processed and their associated pull requests. It provides methods to mark
issues as seen, check if an issue has been seen before, and retrieve pull request
information for an issue.
This class manages a SQLite database that stores hashes of issues that have
already been processed. It provides methods to mark issues as seen and check
if an issue has been seen before, helping to prevent duplicate processing.
Attributes:
conn: SQLite database connection
@ -34,90 +34,56 @@ class SeenIssuesDB:
def _create_table(self):
"""Create the seen_issues table if it doesn't exist.
Creates a table with columns for storing issue hashes and associated pull request information.
Creates a table with a single column for storing issue hashes.
"""
with self.conn:
self.conn.execute("""
CREATE TABLE IF NOT EXISTS seen_issues (
issue_url TEXT PRIMARY KEY,
issue_number TEXT,
pr_number TEXT,
pr_url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
issue_hash TEXT PRIMARY KEY
)
""")
def mark_as_seen(
self,
issue_url: str,
issue_number: str | None = None,
pr_number: str | None = None,
pr_url: str | None = None,
):
def mark_as_seen(self, issue_text: str):
"""Mark an issue as seen in the database.
Computes a hash of the issue text and stores it in the database along with pull request information.
Computes a hash of the issue text and stores it in the database.
If the issue has already been marked as seen, this operation has no effect.
Args:
issue_url: The text content of the issue to mark as seen.
issue_number: The issue number.
pr_number: The pull request number associated with this issue.
pr_url: The URL of the pull request associated with this issue.
issue_text: The text content of the issue to mark as seen.
"""
issue_hash = self._compute_hash(issue_text)
with self.conn:
self.conn.execute(
'INSERT OR IGNORE INTO seen_issues (issue_url, issue_number, pr_number, pr_url) VALUES (?, ?, ?, ?)',
(issue_url, issue_number, pr_number, pr_url),
'INSERT OR IGNORE INTO seen_issues (issue_hash) VALUES (?)',
(issue_hash,),
)
def has_seen(self, issue_url: str) -> bool:
def has_seen(self, issue_text: str) -> bool:
"""Check if an issue has been seen before.
Computes a hash of the issue text and checks if it exists in the database.
Args:
issue_url: The text content of the issue to check.
issue_text: The text content of the issue to check.
Returns:
True if the issue has been seen before, False otherwise.
"""
issue_hash = self._compute_hash(issue_text)
cursor = self.conn.execute(
'SELECT 1 FROM seen_issues WHERE issue_url = ?',
(issue_url,),
'SELECT 1 FROM seen_issues WHERE issue_hash = ?',
(issue_hash,),
)
return cursor.fetchone() is not None
def get_pr_info(self, issue_url: str) -> tuple[str, str] | None:
"""Get pull request information for an issue.
def _compute_hash(self, text: str) -> str:
"""Compute a SHA-256 hash of the given text.
Args:
issue_url: The text content of the issue to check.
text: The text to hash.
Returns:
A tuple containing (pr_number, pr_url) if found, None otherwise.
A hexadecimal string representation of the hash.
"""
cursor = self.conn.execute(
'SELECT pr_number, pr_url FROM seen_issues WHERE issue_url = ?',
(issue_url,),
)
result = cursor.fetchone()
return result if result else None
def update_pr_info(self, issue_url: str, pr_number: str, pr_url: str) -> bool:
"""Update pull request information for an existing issue.
Args:
issue_url: The text content of the issue to update.
pr_number: The pull request number.
pr_url: The URL of the pull request.
Returns:
True if the update was successful, False if the issue wasn't found.
"""
with self.conn:
cursor = self.conn.execute(
'UPDATE seen_issues SET pr_number = ?, pr_url = ? WHERE issue_url = ?',
(pr_number, pr_url, issue_url),
)
return cursor.rowcount > 0
return sha256(text.encode('utf-8')).hexdigest()

View File

@ -1,54 +0,0 @@
from pathlib import Path
from unittest.mock import patch
from aider_gitea import has_commits_on_branch
class TestHasCommitsOnBranch:
def setup_method(self):
self.cwd = Path('/tmp/test-repo')
self.base_branch = 'main'
self.current_branch = 'feature-branch'
@patch('aider_gitea.get_commit_messages')
def test_has_commits_true(self, mock_get_commit_messages):
# Setup mock to return some commit messages
mock_get_commit_messages.return_value = ['Commit 1', 'Commit 2']
# Test function returns True when there are commits
assert (
has_commits_on_branch(self.cwd, self.base_branch, self.current_branch)
is True
)
# Verify get_commit_messages was called with correct arguments
mock_get_commit_messages.assert_called_once_with(
self.cwd, self.base_branch, self.current_branch,
)
@patch('aider_gitea.get_commit_messages')
def test_has_commits_false(self, mock_get_commit_messages):
# Setup mock to return empty list
mock_get_commit_messages.return_value = []
# Test function returns False when there are no commits
assert (
has_commits_on_branch(self.cwd, self.base_branch, self.current_branch)
is False
)
# Verify get_commit_messages was called with correct arguments
mock_get_commit_messages.assert_called_once_with(
self.cwd, self.base_branch, self.current_branch,
)
@patch('aider_gitea.get_commit_messages')
def test_has_commits_exception(self, mock_get_commit_messages):
# Setup mock to raise an exception
mock_get_commit_messages.side_effect = Exception('Git command failed')
# Test function returns False when an exception occurs
assert (
has_commits_on_branch(self.cwd, self.base_branch, self.current_branch)
is False
)

View File

@ -1,77 +0,0 @@
import os
import tempfile
from aider_gitea.seen_issues_db import SeenIssuesDB
class TestSeenIssuesDBPRInfo:
def setup_method(self):
# Create a temporary database file
self.db_fd, self.db_path = tempfile.mkstemp()
self.db = SeenIssuesDB(self.db_path)
# Test data
self.issue_text = 'Test issue title\nTest issue description'
self.issue_number = '123'
self.pr_number = '456'
self.pr_url = 'https://gitea.example.com/owner/repo/pulls/456'
def teardown_method(self):
# Close and remove the temporary database
self.db.conn.close()
os.close(self.db_fd)
os.unlink(self.db_path)
def test_mark_as_seen_with_pr_info(self):
# Mark an issue as seen with PR info
self.db.mark_as_seen(
self.issue_text,
issue_number=self.issue_number,
pr_number=self.pr_number,
pr_url=self.pr_url,
)
# Verify the issue is marked as seen
assert self.db.has_seen(self.issue_text)
# Verify PR info is stored correctly
pr_info = self.db.get_pr_info(self.issue_text)
assert pr_info is not None
assert pr_info[0] == self.pr_number
assert pr_info[1] == self.pr_url
def test_update_pr_info(self):
# First mark the issue as seen without PR info
self.db.mark_as_seen(self.issue_text, issue_number=self.issue_number)
# Verify no PR info is available
assert self.db.get_pr_info(self.issue_text) == (None, None)
# Update with PR info
updated = self.db.update_pr_info(self.issue_text, self.pr_number, self.pr_url)
# Verify update was successful
assert updated
# Verify PR info is now available
pr_info = self.db.get_pr_info(self.issue_text)
assert pr_info[0] == self.pr_number
assert pr_info[1] == self.pr_url
def test_update_nonexistent_issue(self):
# Try to update PR info for an issue that doesn't exist
updated = self.db.update_pr_info(
'Nonexistent issue',
self.pr_number,
self.pr_url,
)
# Verify update failed
assert not updated
def test_get_pr_info_nonexistent(self):
# Try to get PR info for an issue that doesn't exist
pr_info = self.db.get_pr_info('Nonexistent issue')
# Verify no PR info is available
assert pr_info is None

View File

@ -1,18 +1,17 @@
from pathlib import Path
from unittest.mock import MagicMock, patch
from aider_gitea import IssueResolution, RepositoryConfig, solve_issue_in_repository
REPOSITORY_CONFIG = RepositoryConfig(
gitea_url='https://gitea.example.com',
owner='test-owner',
repo='test-repo',
base_branch='main',
)
from aider_gitea import solve_issue_in_repository
class TestSolveIssueInRepository:
def setup_method(self):
self.args = MagicMock()
self.args.gitea_url = 'https://gitea.example.com'
self.args.owner = 'test-owner'
self.args.repo = 'test-repo'
self.args.base_branch = 'main'
self.gitea_client = MagicMock()
self.tmpdirname = Path('/tmp/test-repo')
self.branch_name = 'issue-123-test-branch'
@ -33,11 +32,7 @@ class TestSolveIssueInRepository:
):
# Setup mocks
mock_run_cmd.return_value = True
mock_push_changes.return_value = IssueResolution(
True,
'456',
'https://gitea.example.com/test-owner/test-repo/pulls/456',
)
mock_push_changes.return_value = True
# Mock subprocess.run to return different commit hashes and file changes
mock_subprocess_run.side_effect = [
@ -50,7 +45,7 @@ class TestSolveIssueInRepository:
# Call the function
result = solve_issue_in_repository(
REPOSITORY_CONFIG,
self.args,
self.tmpdirname,
self.branch_name,
self.issue_title,
@ -60,7 +55,7 @@ class TestSolveIssueInRepository:
)
# Verify results
assert result.success is True
assert result is True
assert mock_run_cmd.call_count >= 8 # Verify all expected commands were run
mock_push_changes.assert_called_once()
@ -77,7 +72,6 @@ class TestSolveIssueInRepository:
):
# Setup mocks
mock_run_cmd.return_value = True
mock_push_changes.return_value = IssueResolution(False, None, None)
# Mock subprocess.run to return same commit hash and no file changes
mock_subprocess_run.side_effect = [
@ -87,7 +81,7 @@ class TestSolveIssueInRepository:
# Call the function
result = solve_issue_in_repository(
REPOSITORY_CONFIG,
self.args,
self.tmpdirname,
self.branch_name,
self.issue_title,
@ -97,5 +91,5 @@ class TestSolveIssueInRepository:
)
# Verify results
assert result.success is False
assert result is False
assert mock_push_changes.call_count == 0 # push_changes should not be called