Coverage for cli / src / covered / cli.py: 100%
99 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-19 09:10 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-19 09:10 +0000
1import asyncio
2from datetime import datetime
3from pathlib import Path
4import re
5from typing import Annotated
6import warnings
8import httpx
9import stamina
10import typer
11from aiobotocore.session import get_session
13app = typer.Typer()
16COV_PATTERN = re.compile(r'<span\s+class="pc_cov">\s*([\d.]+)%\s*</span>')
19def _get_coverage_info(cov_report_path: Path) -> float | None:
20 cov_index = cov_report_path / "index.html"
22 if not cov_index.exists():
23 return None
24 m = COV_PATTERN.search(cov_index.read_text())
25 if m:
26 return float(m.group(1))
27 return None
30async def _request(
31 method: str,
32 url: str,
33 *,
34 headers: dict[str, str] | None = None,
35 json: dict | None = None,
36 timeout: int = 60,
37) -> httpx.Response:
38 async with httpx.AsyncClient(timeout=timeout) as client:
39 async for attempt in stamina.retry_context(
40 on=(httpx.TransportError, httpx.HTTPStatusError),
41 attempts=3,
42 wait_jitter=2.0,
43 ):
44 with attempt:
45 resp = await client.request(method, url, headers=headers, json=json)
46 if resp.status_code >= 500:
47 resp.raise_for_status()
48 return resp
51async def _upload_files(
52 directory: Path,
53 session: dict,
54 concurrency: int,
55) -> int:
56 semaphore = asyncio.Semaphore(concurrency)
57 files = [f for f in directory.rglob("*") if f.is_file()]
59 async with get_session().create_client(
60 "s3",
61 region_name=session["region"],
62 aws_access_key_id=session["access_key_id"],
63 aws_secret_access_key=session["secret_access_key"],
64 aws_session_token=session["session_token"],
65 ) as s3:
67 async def upload_one(file_path: Path):
68 key = f"sites/{session['site_id']}/{file_path.relative_to(directory)}"
69 async with semaphore:
70 await s3.put_object(
71 Bucket=session["bucket"],
72 Key=key,
73 Body=file_path.read_bytes(),
74 )
76 results = await asyncio.gather(
77 *(upload_one(f) for f in files),
78 return_exceptions=True,
79 )
80 for r in results:
81 if isinstance(r, BaseException):
82 raise r
84 return len(files)
87async def _main(
88 directory: Path,
89 api_url: str,
90 api_key: str,
91 concurrency: int,
92 repo_owner: str,
93 repo_name: str,
94 commit_sha: str,
95 coverage_threshold: float,
96 gh_token: str,
97 purge_cache: bool,
98) -> None:
99 start_time = datetime.now()
100 typer.echo("Creating upload session...")
101 resp = await _request(
102 "POST",
103 f"{api_url}/coverage/create-site/",
104 headers={"token": api_key},
105 timeout=120,
106 )
107 session = resp.json()
108 typer.echo(f"Session created: site_id={session['site_id']}")
110 typer.echo(f"Uploading files from {directory}...")
111 count = await _upload_files(directory, session, concurrency)
113 typer.echo(f"Uploaded {count} files in {datetime.now() - start_time}.")
115 coverage_val = _get_coverage_info(directory)
116 if coverage_val is None:
117 typer.echo("Coverage info not found in the report. Skipping status update.")
118 return
120 typer.echo(f"Coverage value is {coverage_val}")
121 typer.echo("Updating commit status...")
123 status_state = "success" if coverage_val >= coverage_threshold else "failure"
125 status_url = (
126 f"https://api.github.com/repos/{repo_owner}/{repo_name}/statuses/{commit_sha}"
127 )
128 status_data = {
129 "state": status_state,
130 "description": f"Coverage {coverage_val}%",
131 "target_url": f"{api_url}/coverage/{session['site_id']}/",
132 "context": "coverage",
133 }
134 status_resp = await _request(
135 "POST",
136 status_url,
137 headers={
138 "Authorization": f"Bearer {gh_token}",
139 "Accept": "application/vnd.github+json",
140 },
141 json=status_data,
142 )
144 if status_resp.status_code != 201:
145 typer.echo(f"Failed to set commit status: {status_resp.status_code}", err=True)
146 typer.echo(f"Response: {status_resp.text}", err=True)
147 raise typer.Exit(1)
149 typer.echo("Commit status set successfully")
151 if not purge_cache:
152 typer.echo("No `--purge-cache` set, skip purging Camo cache")
153 return
155 # Clear badge cache
156 resp = await _request(
157 "POST",
158 f"{api_url}/coverage/invalidate-cache/{repo_owner}/{repo_name}/",
159 headers={"token": api_key},
160 )
161 if resp.status_code == 200:
162 typer.echo("Cache invalidated successfully")
163 else:
164 typer.echo(f"Failed to invalidate cache: {resp.status_code}", err=True)
165 typer.echo(f"Response: {resp.text}", err=True)
167 # Purge github Camo cache for the badge
168 badge_url_re = re.compile(
169 rf'<img\s[^>]*src="(https://camo\.githubusercontent\.com/[a-f0-9]+/[a-zA-Z0-9]+)"[^>]*data-canonical-src="{re.escape(api_url)}[^"]*"'
170 )
172 readme_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/readme"
173 readme_resp = await _request(
174 "GET",
175 readme_url,
176 headers={
177 "Authorization": f"Bearer {gh_token}",
178 "Accept": "application/vnd.github.html+json",
179 },
180 timeout=30,
181 )
182 if readme_resp.status_code != 200:
183 typer.echo(f"Failed to fetch README: {readme_resp.status_code}", err=True)
184 return
186 match = badge_url_re.search(readme_resp.text)
187 if not match:
188 typer.echo("Badge URL not found in README")
189 return
191 badge_url = match.group(1)
192 typer.echo(f"Purging Camo cache for: {badge_url}")
193 purge_resp = await _request("PURGE", badge_url, timeout=30)
194 if purge_resp.status_code == 200:
195 typer.echo("Camo cache purged successfully")
196 else:
197 typer.echo(f"Failed to purge Camo cache: {purge_resp.status_code}", err=True)
200@app.command()
201def upload(
202 *,
203 directory: Annotated[
204 Path,
205 typer.Argument(
206 help="Directory to upload", dir_okay=True, file_okay=False, exists=True
207 ),
208 ],
209 api_url: Annotated[
210 str, typer.Option(envvar="COVERED_API_URL", help="Backend API base URL")
211 ],
212 api_key: Annotated[
213 str, typer.Option(envvar="COVERED_API_KEY", help="API key for authentication")
214 ],
215 concurrency: Annotated[int, typer.Option(help="Max concurrent uploads")] = 50,
216 repo_owner: Annotated[
217 str, typer.Option(envvar="COVERED_REPO_OWNER", help="GitHub repository owner")
218 ],
219 repo_name: Annotated[
220 str, typer.Option(envvar="COVERED_REPO_NAME", help="GitHub repository name")
221 ],
222 commit_sha: Annotated[
223 str, typer.Option(envvar="COVERED_COMMIT_SHA", help="Git commit SHA")
224 ],
225 coverage_threshold: Annotated[
226 float,
227 typer.Option(
228 envvar="COVERED_COVERAGE_THRESHOLD",
229 help="Minimum coverage percentage to set success status",
230 ),
231 ] = 100.0,
232 gh_token: Annotated[
233 str,
234 typer.Option(
235 envvar="COVERED_GH_TOKEN", help="GitHub token for setting commit status"
236 ),
237 ],
238 is_default_branch: Annotated[
239 bool | None,
240 typer.Option(
241 envvar="COVERED_IS_DEFAULT_BRANCH",
242 help="(DEPRECATED, use `--purge-cache` instead) Whether this is the default branch (enables cache invalidation)",
243 show_default="False",
244 ),
245 ] = None,
246 purge_cache: Annotated[
247 bool,
248 typer.Option(
249 envvar="COVERED_PURGE_CACHE",
250 help="Whether to purge the GitHub Camo cache after upload (to prevent showing stale badge)",
251 ),
252 ] = False,
253) -> None:
254 """Upload a directory to a temporary site."""
256 if is_default_branch is not None:
257 warnings.warn(
258 "The `--is-default-branch` option is deprecated and will be removed in a future release. Please use `--purge-cache` instead.",
259 DeprecationWarning,
260 )
262 use_purge_cache = purge_cache or (is_default_branch is True)
264 if api_url.endswith("/"):
265 raise typer.BadParameter("must not end with a slash", param_hint="--api-url")
267 asyncio.run(
268 _main(
269 directory=directory,
270 api_url=api_url,
271 api_key=api_key,
272 concurrency=concurrency,
273 repo_owner=repo_owner,
274 repo_name=repo_name,
275 commit_sha=commit_sha,
276 coverage_threshold=coverage_threshold,
277 gh_token=gh_token,
278 purge_cache=use_purge_cache,
279 )
280 )
283if __name__ == "__main__":
284 app()