Compare commits
2 Commits
42e32faba7
...
02de35e1b0
Author | SHA1 | Date | |
---|---|---|---|
02de35e1b0 | |||
eb71b3b961 |
|
@ -145,77 +145,19 @@ from pathlib import Path
|
|||
|
||||
from . import secrets
|
||||
from ._version import __version__ # noqa: F401
|
||||
from .seen_issues_db import SeenIssuesDB as SeenIssuesDB
|
||||
from .seen_issues_db import SeenIssuesDB
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Constants for common configuration values
|
||||
DEFAULT_TIMEOUT_SECONDS = 10_000
|
||||
DEFAULT_DAEMON_INTERVAL_SECONDS = 300
|
||||
DEFAULT_BASE_BRANCH = 'main'
|
||||
DEFAULT_PR_CONTEXT_LINES = 3
|
||||
DEFAULT_LOG_TAIL_LINES = 100
|
||||
DEFAULT_MINIMUM_PYTHON_VERSION = '3.9'
|
||||
|
||||
# Git and repository constants
|
||||
INITIAL_RUFF_COMMIT_MESSAGE = 'Initial ruff pass'
|
||||
RUFF_CLEANUP_COMMIT_MESSAGE = 'Ruff after'
|
||||
PIPELINE_RESOLUTION_COMMIT_PREFIX = 'Resolve pipeline'
|
||||
COMMENT_RESOLUTION_COMMIT_PREFIX = 'Resolve comment'
|
||||
|
||||
# API and model configuration constants
|
||||
DEFAULT_ANTHROPIC_INDICATORS = [
|
||||
'claude',
|
||||
'anthropic',
|
||||
'sonnet',
|
||||
'haiku',
|
||||
'opus',
|
||||
]
|
||||
|
||||
# Error messages as constants
|
||||
ERROR_GITEA_URL_EMPTY = 'gitea_url cannot be empty'
|
||||
ERROR_OWNER_EMPTY = 'owner cannot be empty'
|
||||
ERROR_REPO_EMPTY = 'repo cannot be empty'
|
||||
ERROR_BASE_BRANCH_EMPTY = 'base_branch cannot be empty'
|
||||
ERROR_GITEA_URL_API_SUFFIX = "gitea_url should not include '/api/v1' suffix"
|
||||
ERROR_ISSUE_PARAMS_TYPE = 'Both issue_number and issue_title must be strings'
|
||||
ERROR_ISSUE_NUMBER_EMPTY = 'Issue number cannot be empty'
|
||||
ERROR_MODEL_TYPE = 'Model must be a string, got {}'
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class RepositoryConfig:
|
||||
"""Configuration for a repository to process issues from.
|
||||
|
||||
Attributes:
|
||||
gitea_url: Base URL for the Gitea instance (without '/api/v1').
|
||||
owner: Owner/organization name of the repository.
|
||||
repo: Repository name.
|
||||
base_branch: Base branch name to create feature branches from.
|
||||
"""
|
||||
|
||||
gitea_url: str
|
||||
owner: str
|
||||
repo: str
|
||||
base_branch: str
|
||||
|
||||
def __post_init__(self):
|
||||
"""Validate repository configuration fields."""
|
||||
if not self.gitea_url or not self.gitea_url.strip():
|
||||
raise ValueError(ERROR_GITEA_URL_EMPTY)
|
||||
if not self.owner or not self.owner.strip():
|
||||
raise ValueError(ERROR_OWNER_EMPTY)
|
||||
if not self.repo or not self.repo.strip():
|
||||
raise ValueError(ERROR_REPO_EMPTY)
|
||||
if not self.base_branch or not self.base_branch.strip():
|
||||
raise ValueError(ERROR_BASE_BRANCH_EMPTY)
|
||||
|
||||
# Ensure gitea_url doesn't end with /api/v1 (common mistake)
|
||||
if self.gitea_url.rstrip('/').endswith('/api/v1'):
|
||||
raise ValueError(ERROR_GITEA_URL_API_SUFFIX)
|
||||
|
||||
def repo_url(self) -> str:
|
||||
"""Generate the git clone URL for this repository."""
|
||||
return f'{self.gitea_url}:{self.owner}/{self.repo}.git'.replace(
|
||||
'https://',
|
||||
'git@',
|
||||
|
@ -245,20 +187,8 @@ def generate_branch_name(issue_number: str, issue_title: str) -> str:
|
|||
|
||||
Returns:
|
||||
A sanitized branch name combining the issue number and title.
|
||||
|
||||
Raises:
|
||||
ValueError: If issue_number is empty or invalid.
|
||||
TypeError: If arguments are not strings.
|
||||
"""
|
||||
if not isinstance(issue_number, str) or not isinstance(issue_title, str):
|
||||
raise TypeError(ERROR_ISSUE_PARAMS_TYPE)
|
||||
|
||||
if not issue_number.strip():
|
||||
raise ValueError(ERROR_ISSUE_NUMBER_EMPTY)
|
||||
|
||||
# Sanitize the title, handling empty titles gracefully
|
||||
sanitized_title = issue_title.strip() if issue_title else 'untitled'
|
||||
sanitized = re.sub(r'[^0-9a-zA-Z ]+', '', sanitized_title)
|
||||
sanitized = re.sub(r'[^0-9a-zA-Z ]+', '', issue_title)
|
||||
parts = ['issue', str(issue_number), *sanitized.lower().split()]
|
||||
return '-'.join(parts)
|
||||
|
||||
|
@ -332,7 +262,7 @@ def run_post_solver_cleanup(repository_path: Path, solver_name: str) -> None:
|
|||
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], repository_path, check=False)
|
||||
run_cmd(['git', 'add', '.'], repository_path)
|
||||
run_cmd(
|
||||
['git', 'commit', '-m', f'{RUFF_CLEANUP_COMMIT_MESSAGE} {solver_name}'],
|
||||
['git', 'commit', '-m', f'Ruff after {solver_name}'],
|
||||
repository_path,
|
||||
check=False,
|
||||
)
|
||||
|
@ -376,7 +306,7 @@ class AiderCodeSolver(CodeSolverStrategy):
|
|||
'--yes',
|
||||
'--disable-playwright',
|
||||
'--timeout',
|
||||
str(DEFAULT_TIMEOUT_SECONDS),
|
||||
str(10_000),
|
||||
]
|
||||
|
||||
if edit_format := MODEL_EDIT_MODES.get(CODE_MODEL):
|
||||
|
@ -477,27 +407,17 @@ class ClaudeCodeSolver(CodeSolverStrategy):
|
|||
|
||||
|
||||
def is_anthropic_model(model: str) -> bool:
|
||||
"""Check if the model string indicates an Anthropic/Claude model.
|
||||
|
||||
Args:
|
||||
model: The model name/identifier to check.
|
||||
|
||||
Returns:
|
||||
True if the model appears to be an Anthropic model, False otherwise.
|
||||
|
||||
Raises:
|
||||
TypeError: If model is not a string.
|
||||
"""
|
||||
if model is None:
|
||||
"""Check if the model string indicates an Anthropic/Claude model."""
|
||||
if not model:
|
||||
return False
|
||||
|
||||
if not isinstance(model, str):
|
||||
raise TypeError(ERROR_MODEL_TYPE.format(type(model)))
|
||||
|
||||
if not model.strip():
|
||||
return False
|
||||
|
||||
anthropic_indicators = DEFAULT_ANTHROPIC_INDICATORS
|
||||
anthropic_indicators = [
|
||||
'claude',
|
||||
'anthropic',
|
||||
'sonnet',
|
||||
'haiku',
|
||||
'opus',
|
||||
]
|
||||
|
||||
model_lower = model.lower()
|
||||
return any(indicator in model_lower for indicator in anthropic_indicators)
|
||||
|
@ -739,11 +659,7 @@ def solve_issue_in_repository(
|
|||
# Run initial ruff pass before code solver
|
||||
run_cmd(['bash', '-c', RUFF_FORMAT_AND_AUTO_FIX], repository_path, check=False)
|
||||
run_cmd(['git', 'add', '.'], repository_path)
|
||||
run_cmd(
|
||||
['git', 'commit', '-m', INITIAL_RUFF_COMMIT_MESSAGE],
|
||||
repository_path,
|
||||
check=False,
|
||||
)
|
||||
run_cmd(['git', 'commit', '-m', 'Initial ruff pass'], repository_path, check=False)
|
||||
|
||||
# Run code solver
|
||||
issue_content = f'# {issue_title}\n{issue_description}'
|
||||
|
@ -809,10 +725,10 @@ def solve_issues_in_repository(
|
|||
return
|
||||
|
||||
for issue in issues:
|
||||
issue_url = issue.get('web_url')
|
||||
issue_number = issue.get('number')
|
||||
issue_description = issue.get('body', '')
|
||||
title = issue.get('title', f'Issue {issue_number}')
|
||||
issue_url = issue.html_url
|
||||
issue_number = str(issue.number)
|
||||
issue_description = issue.body
|
||||
title = issue.title
|
||||
if seen_issues_db.has_seen(issue_url):
|
||||
logger.info('Skipping already processed issue #%s: %s', issue_number, title)
|
||||
else:
|
||||
|
@ -885,26 +801,14 @@ def handle_pr_comments(
|
|||
for comment in comments:
|
||||
path = comment.get('path')
|
||||
line = comment.get('line') or comment.get('position') or 0
|
||||
|
||||
if not path:
|
||||
logger.warning('Comment has no path, skipping')
|
||||
continue
|
||||
|
||||
file_path = repository_path / path
|
||||
try:
|
||||
lines = file_path.read_text(encoding='utf-8').splitlines()
|
||||
start = max(0, line - DEFAULT_PR_CONTEXT_LINES)
|
||||
end = min(len(lines), line + DEFAULT_PR_CONTEXT_LINES - 1)
|
||||
lines = file_path.read_text().splitlines()
|
||||
start = max(0, line - 3)
|
||||
end = min(len(lines), line + 2)
|
||||
context = '\n'.join(lines[start:end])
|
||||
except FileNotFoundError:
|
||||
logger.warning('File %s not found for comment context', path)
|
||||
context = f'File {path} not found'
|
||||
except UnicodeDecodeError as e:
|
||||
logger.warning('Failed to decode file %s: %s', path, e)
|
||||
context = f'Unable to read file {path} (encoding issue)'
|
||||
except Exception as e:
|
||||
logger.warning('Failed to read file %s for comment context: %s', path, e)
|
||||
context = f'Error reading file {path}'
|
||||
except Exception:
|
||||
context = ''
|
||||
body = comment.get('body', '')
|
||||
issue = (
|
||||
f'Resolve the following reviewer comment:\n{body}\n\n'
|
||||
|
@ -915,12 +819,7 @@ def handle_pr_comments(
|
|||
# commit and push changes for this comment
|
||||
run_cmd(['git', 'add', path], repository_path, check=False)
|
||||
run_cmd(
|
||||
[
|
||||
'git',
|
||||
'commit',
|
||||
'-m',
|
||||
f'{COMMENT_RESOLUTION_COMMIT_PREFIX} {comment.get("id")}',
|
||||
],
|
||||
['git', 'commit', '-m', f'Resolve comment {comment.get("id")}'],
|
||||
repository_path,
|
||||
check=False,
|
||||
)
|
||||
|
@ -951,17 +850,12 @@ def handle_failing_pipelines(
|
|||
run_id,
|
||||
)
|
||||
lines = log.strip().split('\n')
|
||||
context = '\n'.join(lines[-DEFAULT_LOG_TAIL_LINES:])
|
||||
context = '\n'.join(lines[-100:])
|
||||
issue = f'Resolve the following failing pipeline run {run_id}:\n\n{context}'
|
||||
code_solver.solve_issue_round(repository_path, issue)
|
||||
run_cmd(['git', 'add', '.'], repository_path, check=False)
|
||||
run_cmd(
|
||||
[
|
||||
'git',
|
||||
'commit',
|
||||
'-m',
|
||||
f'{PIPELINE_RESOLUTION_COMMIT_PREFIX} {run_id}',
|
||||
],
|
||||
['git', 'commit', '-m', f'Resolve pipeline {run_id}'],
|
||||
repository_path,
|
||||
check=False,
|
||||
)
|
||||
|
|
|
@ -8,13 +8,9 @@ import argparse
|
|||
import logging
|
||||
import time
|
||||
|
||||
from . import (
|
||||
DEFAULT_BASE_BRANCH,
|
||||
DEFAULT_DAEMON_INTERVAL_SECONDS,
|
||||
RepositoryConfig,
|
||||
secrets,
|
||||
solve_issues_in_repository,
|
||||
)
|
||||
import requests
|
||||
|
||||
from . import RepositoryConfig, secrets, solve_issues_in_repository
|
||||
from .gitea_client import GiteaClient
|
||||
from .seen_issues_db import SeenIssuesDB
|
||||
|
||||
|
@ -37,8 +33,8 @@ def parse_args():
|
|||
)
|
||||
parser.add_argument(
|
||||
'--base-branch',
|
||||
default=DEFAULT_BASE_BRANCH,
|
||||
help=f'Base branch to use for new branches (default: {DEFAULT_BASE_BRANCH})',
|
||||
default='main',
|
||||
help='Base branch to use for new branches (default: main)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--daemon',
|
||||
|
@ -48,8 +44,8 @@ def parse_args():
|
|||
parser.add_argument(
|
||||
'--interval',
|
||||
type=int,
|
||||
default=DEFAULT_DAEMON_INTERVAL_SECONDS,
|
||||
help=f'Interval in seconds between checks in daemon mode (default: {DEFAULT_DAEMON_INTERVAL_SECONDS})',
|
||||
default=30,
|
||||
help='Interval in seconds between checks in daemon mode (default: 300)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--aider-model',
|
||||
|
@ -75,7 +71,8 @@ def main():
|
|||
core.EVALUATOR_MODEL = args.evaluator_model
|
||||
|
||||
seen_issues_db = SeenIssuesDB()
|
||||
client = GiteaClient(args.gitea_url, secrets.gitea_token())
|
||||
session = requests.Session()
|
||||
client = GiteaClient(session, gitea_url=args.gitea_url, token=secrets.gitea_token())
|
||||
|
||||
if args.repo:
|
||||
repositories = [args.repo]
|
||||
|
|
|
@ -1,10 +1,25 @@
|
|||
import logging
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
from .models import (
|
||||
GiteaIssue,
|
||||
GiteaLabel,
|
||||
GiteaUser,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Module-level constants
|
||||
API_VERSION_PATH = '/api/v1'
|
||||
DEFAULT_CONTENT_TYPE = 'application/json'
|
||||
AIDER_LABEL_NAME = 'aider'
|
||||
SUCCESS_CONCLUSION = 'success'
|
||||
CONFLICT_STATUS_CODE = 409
|
||||
UNPROCESSABLE_ENTITY_STATUS_CODE = 422
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Client for interacting with the Gitea API.
|
||||
|
@ -14,121 +29,191 @@ class GiteaClient:
|
|||
|
||||
Read more about the Gitea API here: https://gitea.com/api/swagger
|
||||
|
||||
Follows the standardized client format:
|
||||
1. Constructor takes a requests.Session object
|
||||
2. All secrets are provided via keyword arguments
|
||||
3. ROOT_URL constant field for constructing URLs
|
||||
|
||||
Attributes:
|
||||
gitea_url (str): The base URL for the Gitea API endpoints.
|
||||
session (requests.Session): HTTP session for making API requests.
|
||||
ROOT_URL (str): The base URL for the Gitea API endpoints.
|
||||
"""
|
||||
|
||||
def __init__(self, gitea_url: str, token: str) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
session: requests.Session,
|
||||
*,
|
||||
gitea_url: str,
|
||||
token: str = '',
|
||||
) -> None:
|
||||
"""Initialize a new Gitea API client.
|
||||
|
||||
Args:
|
||||
gitea_url (str): Base URL for the Gitea instance (without '/api/v1').
|
||||
token (str): Authentication token for the Gitea API. If empty, requests will be unauthenticated.
|
||||
session: HTTP session object to use for requests.
|
||||
gitea_url: Base URL for the Gitea instance (without '/api/v1').
|
||||
token: Authentication token for the Gitea API. If empty, requests will be unauthenticated.
|
||||
|
||||
Raises:
|
||||
AssertionError: If gitea_url ends with '/api/v1'.
|
||||
"""
|
||||
assert not gitea_url.endswith('/api/v1')
|
||||
self.gitea_url = gitea_url + '/api/v1'
|
||||
self.session = requests.Session()
|
||||
self.session.headers['Content-Type'] = 'application/json'
|
||||
assert not gitea_url.endswith(API_VERSION_PATH)
|
||||
self.session = session
|
||||
self.ROOT_URL = gitea_url + API_VERSION_PATH
|
||||
self.session.headers['Content-Type'] = DEFAULT_CONTENT_TYPE
|
||||
if token:
|
||||
self.session.headers['Authorization'] = f'token {token}'
|
||||
|
||||
def get_default_branch_sha(self, owner: str, repo: str, branch: str) -> str:
|
||||
def get_default_branch_sha(self, owner: str, repo: str, branch_name: str) -> str:
|
||||
"""Retrieve the commit SHA of the specified branch.
|
||||
|
||||
Args:
|
||||
owner (str): Owner of the repository.
|
||||
repo (str): Name of the repository.
|
||||
branch (str): Name of the branch.
|
||||
owner: Owner of the repository.
|
||||
repo: Name of the repository.
|
||||
branch_name: Name of the branch.
|
||||
|
||||
Returns:
|
||||
str: The commit SHA of the specified branch.
|
||||
The commit SHA of the specified branch.
|
||||
|
||||
Raises:
|
||||
requests.HTTPError: If the API request fails.
|
||||
"""
|
||||
url = f'{self.gitea_url}/repos/{owner}/{repo}/branches/{branch}'
|
||||
response = self.session.get(url)
|
||||
api_url = f'{self.ROOT_URL}/repos/{owner}/{repo}/branches/{branch_name}'
|
||||
response = self.session.get(api_url)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
return data['commit']['sha']
|
||||
branch_data = response.json()
|
||||
return branch_data['commit']['sha']
|
||||
|
||||
def create_branch(self, owner: str, repo: str, new_branch: str, sha: str) -> bool:
|
||||
def create_branch(
|
||||
self,
|
||||
owner: str,
|
||||
repo: str,
|
||||
new_branch_name: str,
|
||||
commit_sha: str,
|
||||
) -> bool:
|
||||
"""Create a new branch from the provided SHA.
|
||||
|
||||
Args:
|
||||
owner (str): Owner of the repository.
|
||||
repo (str): Name of the repository.
|
||||
new_branch (str): Name of the new branch to create.
|
||||
sha (str): Commit SHA to use as the starting point for the new branch.
|
||||
owner: Owner of the repository.
|
||||
repo: Name of the repository.
|
||||
new_branch_name: Name of the new branch to create.
|
||||
commit_sha: Commit SHA to use as the starting point for the new branch.
|
||||
|
||||
Returns:
|
||||
bool: True if the branch was created successfully, False if the branch already exists.
|
||||
True if the branch was created successfully, False if the branch already exists.
|
||||
|
||||
Raises:
|
||||
requests.HTTPError: If the API request fails for reasons other than branch already existing.
|
||||
"""
|
||||
url = f'{self.gitea_url}/repos/{owner}/{repo}/git/refs'
|
||||
json_data = {'ref': f'refs/heads/{new_branch}', 'sha': sha}
|
||||
response = self.session.post(url, json=json_data)
|
||||
if response.status_code == 422:
|
||||
logger.warning('Branch %s already exists.', new_branch)
|
||||
api_url = f'{self.ROOT_URL}/repos/{owner}/{repo}/git/refs'
|
||||
request_payload = {'ref': f'refs/heads/{new_branch_name}', 'sha': commit_sha}
|
||||
response = self.session.post(api_url, json=request_payload)
|
||||
if response.status_code == UNPROCESSABLE_ENTITY_STATUS_CODE:
|
||||
logger.warning('Branch %s already exists.', new_branch_name)
|
||||
return False
|
||||
response.raise_for_status()
|
||||
return True
|
||||
|
||||
def get_issues(self, owner: str, repo: str) -> list[dict[str, str]]:
|
||||
def get_issues(self, owner: str, repo: str) -> list[GiteaIssue]:
|
||||
"""Download issues from the specified repository and filter those with the 'aider' label.
|
||||
|
||||
Args:
|
||||
owner (str): Owner of the repository.
|
||||
repo (str): Name of the repository.
|
||||
owner: Owner of the repository.
|
||||
repo: Name of the repository.
|
||||
|
||||
Returns:
|
||||
list: A list of issue dictionaries, filtered to only include issues with the 'aider' label.
|
||||
A list of GiteaIssue objects, filtered to only include issues with the 'aider' label.
|
||||
|
||||
Raises:
|
||||
requests.HTTPError: If the API request fails.
|
||||
"""
|
||||
url = f'{self.gitea_url}/repos/{owner}/{repo}/issues'
|
||||
response = self.session.get(url)
|
||||
api_url = f'{self.ROOT_URL}/repos/{owner}/{repo}/issues'
|
||||
response = self.session.get(api_url)
|
||||
response.raise_for_status()
|
||||
issues = response.json()
|
||||
issues_data = response.json()
|
||||
|
||||
# Filter to only include issues marked with the "aider" label.
|
||||
issues = [
|
||||
issue
|
||||
for issue in issues
|
||||
if any(label.get('name') == 'aider' for label in issue.get('labels', []))
|
||||
filtered_issues = [
|
||||
issue_data
|
||||
for issue_data in issues_data
|
||||
if any(
|
||||
label_data.get('name') == AIDER_LABEL_NAME
|
||||
for label_data in issue_data.get('labels', [])
|
||||
)
|
||||
]
|
||||
return issues
|
||||
|
||||
# Convert to dataclass objects
|
||||
gitea_issues = []
|
||||
for issue_data in filtered_issues:
|
||||
labels = [
|
||||
GiteaLabel(
|
||||
id=label_data['id'],
|
||||
name=label_data['name'],
|
||||
color=label_data['color'],
|
||||
description=label_data.get('description', ''),
|
||||
)
|
||||
for label_data in issue_data.get('labels', [])
|
||||
]
|
||||
|
||||
user = GiteaUser(
|
||||
login=issue_data['user']['login'],
|
||||
id=issue_data['user']['id'],
|
||||
full_name=issue_data['user'].get('full_name', ''),
|
||||
email=issue_data['user'].get('email', ''),
|
||||
avatar_url=issue_data['user'].get('avatar_url', ''),
|
||||
)
|
||||
|
||||
assignees = [
|
||||
GiteaUser(
|
||||
login=assignee_data['login'],
|
||||
id=assignee_data['id'],
|
||||
full_name=assignee_data.get('full_name', ''),
|
||||
email=assignee_data.get('email', ''),
|
||||
avatar_url=assignee_data.get('avatar_url', ''),
|
||||
)
|
||||
for assignee_data in issue_data.get('assignees', [])
|
||||
]
|
||||
|
||||
gitea_issue = GiteaIssue(
|
||||
id=issue_data['id'],
|
||||
number=issue_data['number'],
|
||||
title=issue_data['title'],
|
||||
body=issue_data.get('body', ''),
|
||||
state=issue_data['state'],
|
||||
labels=labels,
|
||||
user=user,
|
||||
assignees=assignees,
|
||||
html_url=issue_data['html_url'],
|
||||
created_at=issue_data['created_at'],
|
||||
updated_at=issue_data['updated_at'],
|
||||
)
|
||||
gitea_issues.append(gitea_issue)
|
||||
|
||||
return gitea_issues
|
||||
|
||||
def iter_user_repositories(
|
||||
self,
|
||||
owner: str,
|
||||
owner_name: str,
|
||||
only_those_with_issues: bool = False,
|
||||
) -> Iterator[str]:
|
||||
"""Get a list of repositories for a given user.
|
||||
|
||||
Args:
|
||||
owner (str): The owner of the repositories.
|
||||
only_those_with_issues (bool): If True, only return repositories with issues enabled.
|
||||
owner_name: The owner of the repositories.
|
||||
only_those_with_issues: If True, only return repositories with issues enabled.
|
||||
|
||||
Returns:
|
||||
Iterator[str]: An iterator of repository names.
|
||||
An iterator of repository names.
|
||||
"""
|
||||
url = f'{self.gitea_url}/user/repos'
|
||||
response = self.session.get(url)
|
||||
api_url = f'{self.ROOT_URL}/user/repos'
|
||||
response = self.session.get(api_url)
|
||||
response.raise_for_status()
|
||||
|
||||
for repo in response.json():
|
||||
if only_those_with_issues and not repo['has_issues']:
|
||||
for repository_data in response.json():
|
||||
if only_those_with_issues and not repository_data['has_issues']:
|
||||
continue
|
||||
if repo['owner']['login'].lower() != owner.lower():
|
||||
if repository_data['owner']['login'].lower() != owner_name.lower():
|
||||
continue
|
||||
yield repo['name']
|
||||
yield repository_data['name']
|
||||
|
||||
def create_pull_request(
|
||||
self,
|
||||
|
@ -157,54 +242,61 @@ class GiteaClient:
|
|||
Raises:
|
||||
requests.HTTPError: If the API request fails.
|
||||
"""
|
||||
url = f'{self.gitea_url}/repos/{owner}/{repo}/pulls'
|
||||
json_data = {
|
||||
api_url = f'{self.ROOT_URL}/repos/{owner}/{repo}/pulls'
|
||||
request_payload = {
|
||||
'title': title,
|
||||
'body': body,
|
||||
'head': head,
|
||||
'base': base,
|
||||
}
|
||||
|
||||
response = self.session.post(url, json=json_data)
|
||||
response = self.session.post(api_url, json=request_payload)
|
||||
# If a pull request for this head/base already exists, return it instead of crashing
|
||||
if response.status_code == 409:
|
||||
if response.status_code == CONFLICT_STATUS_CODE:
|
||||
logger.warning(
|
||||
'Pull request already exists for head %s and base %s',
|
||||
head,
|
||||
base,
|
||||
)
|
||||
prs = self.get_pull_requests(owner, repo)
|
||||
for pr in prs:
|
||||
existing_pull_requests = self.get_pull_requests(owner, repo)
|
||||
for existing_pr in existing_pull_requests:
|
||||
if (
|
||||
pr.get('head', {}).get('ref') == head
|
||||
and pr.get('base', {}).get('ref') == base
|
||||
existing_pr.get('head', {}).get('ref') == head
|
||||
and existing_pr.get('base', {}).get('ref') == base
|
||||
):
|
||||
return pr
|
||||
return existing_pr
|
||||
# fallback to raise if we can’t find it
|
||||
response.raise_for_status()
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def get_failed_pipelines(self, owner: str, repo: str, pr_number: str) -> list[int]:
|
||||
def get_failed_pipelines(
|
||||
self,
|
||||
owner: str,
|
||||
repo: str,
|
||||
pull_request_number: str,
|
||||
) -> list[int]:
|
||||
"""Fetch pipeline runs for a PR and return IDs of failed runs."""
|
||||
url = f'{self.gitea_url}/repos/{owner}/{repo}/actions/runs'
|
||||
response = self.session.get(url)
|
||||
api_url = f'{self.ROOT_URL}/repos/{owner}/{repo}/actions/runs'
|
||||
response = self.session.get(api_url)
|
||||
response.raise_for_status()
|
||||
runs = response.json().get('workflow_runs', [])
|
||||
failed = []
|
||||
for run in runs:
|
||||
workflow_runs = response.json().get('workflow_runs', [])
|
||||
failed_run_ids = []
|
||||
for workflow_run in workflow_runs:
|
||||
if any(
|
||||
pr.get('number') == int(pr_number)
|
||||
for pr in run.get('pull_requests', [])
|
||||
pull_request.get('number') == int(pull_request_number)
|
||||
for pull_request in workflow_run.get('pull_requests', [])
|
||||
):
|
||||
if run.get('conclusion') not in ('success',):
|
||||
failed.append(run.get('id'))
|
||||
return failed
|
||||
if workflow_run.get('conclusion') != SUCCESS_CONCLUSION:
|
||||
failed_run_ids.append(workflow_run.get('id'))
|
||||
return failed_run_ids
|
||||
|
||||
def get_pipeline_log(self, owner: str, repo: str, run_id: int) -> str:
|
||||
def get_pipeline_log(self, owner: str, repo: str, workflow_run_id: int) -> str:
|
||||
"""Download the logs for a pipeline run."""
|
||||
url = f'{self.gitea_url}/repos/{owner}/{repo}/actions/runs/{run_id}/logs'
|
||||
response = self.session.get(url)
|
||||
api_url = (
|
||||
f'{self.ROOT_URL}/repos/{owner}/{repo}/actions/runs/{workflow_run_id}/logs'
|
||||
)
|
||||
response = self.session.get(api_url)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
|
||||
|
@ -212,10 +304,12 @@ class GiteaClient:
|
|||
self,
|
||||
owner: str,
|
||||
repo: str,
|
||||
state: str = 'open',
|
||||
) -> list[dict]:
|
||||
pull_request_state: str = 'open',
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Fetch pull requests for a repository."""
|
||||
url = f'{self.gitea_url}/repos/{owner}/{repo}/pulls?state={state}'
|
||||
response = self.session.get(url)
|
||||
api_url = (
|
||||
f'{self.ROOT_URL}/repos/{owner}/{repo}/pulls?state={pull_request_state}'
|
||||
)
|
||||
response = self.session.get(api_url)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
|
111
aider_gitea/models.py
Normal file
111
aider_gitea/models.py
Normal file
|
@ -0,0 +1,111 @@
|
|||
"""Data models for Gitea API responses.
|
||||
|
||||
This module contains dataclasses that represent the structure of data
|
||||
returned from the Gitea API, providing type safety and better code organization.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaUser:
|
||||
"""Represents a Gitea user."""
|
||||
|
||||
login: str # User's login name
|
||||
id: int # User's ID
|
||||
full_name: str # User's full name
|
||||
email: str # User's email address
|
||||
avatar_url: str # URL to user's avatar image
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaLabel:
|
||||
"""Represents a label in Gitea."""
|
||||
|
||||
id: int # Label ID
|
||||
name: str # Label name
|
||||
color: str # Label color (hex code)
|
||||
description: str # Label description
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaCommit:
|
||||
"""Represents a commit in Gitea."""
|
||||
|
||||
sha: str # Commit SHA hash
|
||||
url: str # API URL for the commit
|
||||
message: str # Commit message
|
||||
author: GiteaUser # Commit author
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaBranch:
|
||||
"""Represents a branch in Gitea."""
|
||||
|
||||
name: str # Branch name
|
||||
commit: GiteaCommit # Latest commit on the branch
|
||||
protected: bool # Whether the branch is protected
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaRepository:
|
||||
"""Represents a repository in Gitea."""
|
||||
|
||||
id: int # Repository ID
|
||||
name: str # Repository name
|
||||
full_name: str # Full repository name (owner/repo)
|
||||
owner: GiteaUser # Repository owner
|
||||
description: str # Repository description
|
||||
clone_url: str # URL for cloning the repository
|
||||
has_issues: bool # Whether issues are enabled
|
||||
default_branch: str # Default branch name
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaIssue:
|
||||
"""Represents an issue in Gitea."""
|
||||
|
||||
id: int # Issue ID
|
||||
number: int # Issue number
|
||||
title: str # Issue title
|
||||
body: str # Issue description/body
|
||||
state: str # Issue state (open, closed, etc.)
|
||||
labels: list[GiteaLabel] # List of labels attached to the issue
|
||||
user: GiteaUser # User who created the issue
|
||||
assignees: list[GiteaUser] # List of assigned users
|
||||
html_url: str # Web URL for the issue
|
||||
created_at: str # ISO datetime when issue was created
|
||||
updated_at: str # ISO datetime when issue was last updated
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaPullRequest:
|
||||
"""Represents a pull request in Gitea."""
|
||||
|
||||
id: int # Pull request ID
|
||||
number: int # Pull request number
|
||||
title: str # Pull request title
|
||||
body: str # Pull request description
|
||||
state: str # Pull request state (open, closed, merged)
|
||||
user: GiteaUser # User who created the pull request
|
||||
head: dict[str, Any] # Head branch information
|
||||
base: dict[str, Any] # Base branch information
|
||||
html_url: str # Web URL for the pull request
|
||||
created_at: str # ISO datetime when PR was created
|
||||
updated_at: str # ISO datetime when PR was last updated
|
||||
merged_at: str | None # ISO datetime when PR was merged (if applicable)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GiteaWorkflowRun:
|
||||
"""Represents a GitHub Actions workflow run in Gitea."""
|
||||
|
||||
id: int # Workflow run ID
|
||||
name: str # Workflow name
|
||||
status: str # Run status (queued, in_progress, completed)
|
||||
conclusion: str | None # Run conclusion (success, failure, cancelled, etc.)
|
||||
workflow_id: int # ID of the workflow
|
||||
pull_requests: list[dict[str, Any]] # Associated pull requests
|
||||
created_at: str # ISO datetime when run was created
|
||||
updated_at: str # ISO datetime when run was last updated
|
|
@ -35,12 +35,6 @@ class TestClaudeCodeIntegration:
|
|||
assert not is_anthropic_model('')
|
||||
assert not is_anthropic_model(None)
|
||||
|
||||
# Test error handling for invalid types
|
||||
with pytest.raises(TypeError):
|
||||
is_anthropic_model(123)
|
||||
with pytest.raises(TypeError):
|
||||
is_anthropic_model(['claude'])
|
||||
|
||||
def test_create_code_solver_routing(self, monkeypatch):
|
||||
"""Test that the correct solver is created based on model."""
|
||||
import aider_gitea
|
||||
|
|
|
@ -1,11 +1,18 @@
|
|||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import requests
|
||||
|
||||
from aider_gitea.gitea_client import GiteaClient
|
||||
|
||||
|
||||
class TestGiteaClientPRLabels:
|
||||
def setup_method(self):
|
||||
self.client = GiteaClient('https://gitea.example.com', 'fake_token')
|
||||
session = requests.Session()
|
||||
self.client = GiteaClient(
|
||||
session,
|
||||
gitea_url='https://gitea.example.com',
|
||||
token='fake_token',
|
||||
)
|
||||
|
||||
@patch('requests.Session.post')
|
||||
def test_create_pull_request_with_labels(self, mock_post):
|
||||
|
|
|
@ -1,139 +0,0 @@
|
|||
import pytest
|
||||
|
||||
from aider_gitea import RepositoryConfig, generate_branch_name
|
||||
|
||||
|
||||
class TestRepositoryConfigValidation:
|
||||
"""Test validation in RepositoryConfig dataclass."""
|
||||
|
||||
def test_valid_repository_config(self):
|
||||
"""Test that valid configurations work correctly."""
|
||||
config = RepositoryConfig(
|
||||
gitea_url='https://gitea.example.com',
|
||||
owner='testowner',
|
||||
repo='testrepo',
|
||||
base_branch='main',
|
||||
)
|
||||
assert config.gitea_url == 'https://gitea.example.com'
|
||||
assert config.owner == 'testowner'
|
||||
assert config.repo == 'testrepo'
|
||||
assert config.base_branch == 'main'
|
||||
|
||||
def test_empty_gitea_url_validation(self):
|
||||
"""Test that empty gitea_url raises ValueError."""
|
||||
with pytest.raises(ValueError, match='gitea_url cannot be empty'):
|
||||
RepositoryConfig(
|
||||
gitea_url='',
|
||||
owner='testowner',
|
||||
repo='testrepo',
|
||||
base_branch='main',
|
||||
)
|
||||
|
||||
def test_empty_owner_validation(self):
|
||||
"""Test that empty owner raises ValueError."""
|
||||
with pytest.raises(ValueError, match='owner cannot be empty'):
|
||||
RepositoryConfig(
|
||||
gitea_url='https://gitea.example.com',
|
||||
owner='',
|
||||
repo='testrepo',
|
||||
base_branch='main',
|
||||
)
|
||||
|
||||
def test_empty_repo_validation(self):
|
||||
"""Test that empty repo raises ValueError."""
|
||||
with pytest.raises(ValueError, match='repo cannot be empty'):
|
||||
RepositoryConfig(
|
||||
gitea_url='https://gitea.example.com',
|
||||
owner='testowner',
|
||||
repo='',
|
||||
base_branch='main',
|
||||
)
|
||||
|
||||
def test_empty_base_branch_validation(self):
|
||||
"""Test that empty base_branch raises ValueError."""
|
||||
with pytest.raises(ValueError, match='base_branch cannot be empty'):
|
||||
RepositoryConfig(
|
||||
gitea_url='https://gitea.example.com',
|
||||
owner='testowner',
|
||||
repo='testrepo',
|
||||
base_branch='',
|
||||
)
|
||||
|
||||
def test_gitea_url_api_v1_suffix_validation(self):
|
||||
"""Test that gitea_url with /api/v1 suffix raises ValueError."""
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match="gitea_url should not include '/api/v1' suffix",
|
||||
):
|
||||
RepositoryConfig(
|
||||
gitea_url='https://gitea.example.com/api/v1',
|
||||
owner='testowner',
|
||||
repo='testrepo',
|
||||
base_branch='main',
|
||||
)
|
||||
|
||||
def test_whitespace_only_fields_validation(self):
|
||||
"""Test that whitespace-only fields raise ValueError."""
|
||||
with pytest.raises(ValueError, match='owner cannot be empty'):
|
||||
RepositoryConfig(
|
||||
gitea_url='https://gitea.example.com',
|
||||
owner=' ',
|
||||
repo='testrepo',
|
||||
base_branch='main',
|
||||
)
|
||||
|
||||
|
||||
class TestBranchNameGeneration:
|
||||
"""Test validation in generate_branch_name function."""
|
||||
|
||||
def test_valid_branch_name_generation(self):
|
||||
"""Test that valid inputs generate correct branch names."""
|
||||
result = generate_branch_name('123', 'Fix authentication bug')
|
||||
expected = 'issue-123-fix-authentication-bug'
|
||||
assert result == expected
|
||||
|
||||
def test_empty_issue_number_validation(self):
|
||||
"""Test that empty issue_number raises ValueError."""
|
||||
with pytest.raises(ValueError, match='Issue number cannot be empty'):
|
||||
generate_branch_name('', 'Fix bug')
|
||||
|
||||
def test_whitespace_issue_number_validation(self):
|
||||
"""Test that whitespace-only issue_number raises ValueError."""
|
||||
with pytest.raises(ValueError, match='Issue number cannot be empty'):
|
||||
generate_branch_name(' ', 'Fix bug')
|
||||
|
||||
def test_non_string_issue_number_validation(self):
|
||||
"""Test that non-string issue_number raises TypeError."""
|
||||
with pytest.raises(
|
||||
TypeError,
|
||||
match='Both issue_number and issue_title must be strings',
|
||||
):
|
||||
generate_branch_name(123, 'Fix bug')
|
||||
|
||||
def test_non_string_issue_title_validation(self):
|
||||
"""Test that non-string issue_title raises TypeError."""
|
||||
with pytest.raises(
|
||||
TypeError,
|
||||
match='Both issue_number and issue_title must be strings',
|
||||
):
|
||||
generate_branch_name('123', 456)
|
||||
|
||||
def test_empty_issue_title_handling(self):
|
||||
"""Test that empty issue_title is handled gracefully."""
|
||||
result = generate_branch_name('123', '')
|
||||
expected = 'issue-123-untitled'
|
||||
assert result == expected
|
||||
|
||||
def test_none_issue_title_handling(self):
|
||||
"""Test that None issue_title is handled gracefully."""
|
||||
with pytest.raises(
|
||||
TypeError,
|
||||
match='Both issue_number and issue_title must be strings',
|
||||
):
|
||||
generate_branch_name('123', None)
|
||||
|
||||
def test_special_characters_sanitization(self):
|
||||
"""Test that special characters are properly sanitized."""
|
||||
result = generate_branch_name('456', 'Fix #bug [with] @special! chars?')
|
||||
expected = 'issue-456-fix-bug-with-special-chars'
|
||||
assert result == expected
|
Loading…
Reference in New Issue
Block a user