Coverage for cli / src / covered / cli.py: 100%

99 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-19 09:10 +0000

1import asyncio 

2from datetime import datetime 

3from pathlib import Path 

4import re 

5from typing import Annotated 

6import warnings 

7 

8import httpx 

9import stamina 

10import typer 

11from aiobotocore.session import get_session 

12 

13app = typer.Typer() 

14 

15 

16COV_PATTERN = re.compile(r'<span\s+class="pc_cov">\s*([\d.]+)%\s*</span>') 

17 

18 

19def _get_coverage_info(cov_report_path: Path) -> float | None: 

20 cov_index = cov_report_path / "index.html" 

21 

22 if not cov_index.exists(): 

23 return None 

24 m = COV_PATTERN.search(cov_index.read_text()) 

25 if m: 

26 return float(m.group(1)) 

27 return None 

28 

29 

30async def _request( 

31 method: str, 

32 url: str, 

33 *, 

34 headers: dict[str, str] | None = None, 

35 json: dict | None = None, 

36 timeout: int = 60, 

37) -> httpx.Response: 

38 async with httpx.AsyncClient(timeout=timeout) as client: 

39 async for attempt in stamina.retry_context( 

40 on=(httpx.TransportError, httpx.HTTPStatusError), 

41 attempts=3, 

42 wait_jitter=2.0, 

43 ): 

44 with attempt: 

45 resp = await client.request(method, url, headers=headers, json=json) 

46 if resp.status_code >= 500: 

47 resp.raise_for_status() 

48 return resp 

49 

50 

51async def _upload_files( 

52 directory: Path, 

53 session: dict, 

54 concurrency: int, 

55) -> int: 

56 semaphore = asyncio.Semaphore(concurrency) 

57 files = [f for f in directory.rglob("*") if f.is_file()] 

58 

59 async with get_session().create_client( 

60 "s3", 

61 region_name=session["region"], 

62 aws_access_key_id=session["access_key_id"], 

63 aws_secret_access_key=session["secret_access_key"], 

64 aws_session_token=session["session_token"], 

65 ) as s3: 

66 

67 async def upload_one(file_path: Path): 

68 key = f"sites/{session['site_id']}/{file_path.relative_to(directory)}" 

69 async with semaphore: 

70 await s3.put_object( 

71 Bucket=session["bucket"], 

72 Key=key, 

73 Body=file_path.read_bytes(), 

74 ) 

75 

76 results = await asyncio.gather( 

77 *(upload_one(f) for f in files), 

78 return_exceptions=True, 

79 ) 

80 for r in results: 

81 if isinstance(r, BaseException): 

82 raise r 

83 

84 return len(files) 

85 

86 

87async def _main( 

88 directory: Path, 

89 api_url: str, 

90 api_key: str, 

91 concurrency: int, 

92 repo_owner: str, 

93 repo_name: str, 

94 commit_sha: str, 

95 coverage_threshold: float, 

96 gh_token: str, 

97 purge_cache: bool, 

98) -> None: 

99 start_time = datetime.now() 

100 typer.echo("Creating upload session...") 

101 resp = await _request( 

102 "POST", 

103 f"{api_url}/coverage/create-site/", 

104 headers={"token": api_key}, 

105 timeout=120, 

106 ) 

107 session = resp.json() 

108 typer.echo(f"Session created: site_id={session['site_id']}") 

109 

110 typer.echo(f"Uploading files from {directory}...") 

111 count = await _upload_files(directory, session, concurrency) 

112 

113 typer.echo(f"Uploaded {count} files in {datetime.now() - start_time}.") 

114 

115 coverage_val = _get_coverage_info(directory) 

116 if coverage_val is None: 

117 typer.echo("Coverage info not found in the report. Skipping status update.") 

118 return 

119 

120 typer.echo(f"Coverage value is {coverage_val}") 

121 typer.echo("Updating commit status...") 

122 

123 status_state = "success" if coverage_val >= coverage_threshold else "failure" 

124 

125 status_url = ( 

126 f"https://api.github.com/repos/{repo_owner}/{repo_name}/statuses/{commit_sha}" 

127 ) 

128 status_data = { 

129 "state": status_state, 

130 "description": f"Coverage {coverage_val}%", 

131 "target_url": f"{api_url}/coverage/{session['site_id']}/", 

132 "context": "coverage", 

133 } 

134 status_resp = await _request( 

135 "POST", 

136 status_url, 

137 headers={ 

138 "Authorization": f"Bearer {gh_token}", 

139 "Accept": "application/vnd.github+json", 

140 }, 

141 json=status_data, 

142 ) 

143 

144 if status_resp.status_code != 201: 

145 typer.echo(f"Failed to set commit status: {status_resp.status_code}", err=True) 

146 typer.echo(f"Response: {status_resp.text}", err=True) 

147 raise typer.Exit(1) 

148 

149 typer.echo("Commit status set successfully") 

150 

151 if not purge_cache: 

152 typer.echo("No `--purge-cache` set, skip purging Camo cache") 

153 return 

154 

155 # Clear badge cache 

156 resp = await _request( 

157 "POST", 

158 f"{api_url}/coverage/invalidate-cache/{repo_owner}/{repo_name}/", 

159 headers={"token": api_key}, 

160 ) 

161 if resp.status_code == 200: 

162 typer.echo("Cache invalidated successfully") 

163 else: 

164 typer.echo(f"Failed to invalidate cache: {resp.status_code}", err=True) 

165 typer.echo(f"Response: {resp.text}", err=True) 

166 

167 # Purge github Camo cache for the badge 

168 badge_url_re = re.compile( 

169 rf'<img\s[^>]*src="(https://camo\.githubusercontent\.com/[a-f0-9]+/[a-zA-Z0-9]+)"[^>]*data-canonical-src="{re.escape(api_url)}[^"]*"' 

170 ) 

171 

172 readme_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/readme" 

173 readme_resp = await _request( 

174 "GET", 

175 readme_url, 

176 headers={ 

177 "Authorization": f"Bearer {gh_token}", 

178 "Accept": "application/vnd.github.html+json", 

179 }, 

180 timeout=30, 

181 ) 

182 if readme_resp.status_code != 200: 

183 typer.echo(f"Failed to fetch README: {readme_resp.status_code}", err=True) 

184 return 

185 

186 match = badge_url_re.search(readme_resp.text) 

187 if not match: 

188 typer.echo("Badge URL not found in README") 

189 return 

190 

191 badge_url = match.group(1) 

192 typer.echo(f"Purging Camo cache for: {badge_url}") 

193 purge_resp = await _request("PURGE", badge_url, timeout=30) 

194 if purge_resp.status_code == 200: 

195 typer.echo("Camo cache purged successfully") 

196 else: 

197 typer.echo(f"Failed to purge Camo cache: {purge_resp.status_code}", err=True) 

198 

199 

200@app.command() 

201def upload( 

202 *, 

203 directory: Annotated[ 

204 Path, 

205 typer.Argument( 

206 help="Directory to upload", dir_okay=True, file_okay=False, exists=True 

207 ), 

208 ], 

209 api_url: Annotated[ 

210 str, typer.Option(envvar="COVERED_API_URL", help="Backend API base URL") 

211 ], 

212 api_key: Annotated[ 

213 str, typer.Option(envvar="COVERED_API_KEY", help="API key for authentication") 

214 ], 

215 concurrency: Annotated[int, typer.Option(help="Max concurrent uploads")] = 50, 

216 repo_owner: Annotated[ 

217 str, typer.Option(envvar="COVERED_REPO_OWNER", help="GitHub repository owner") 

218 ], 

219 repo_name: Annotated[ 

220 str, typer.Option(envvar="COVERED_REPO_NAME", help="GitHub repository name") 

221 ], 

222 commit_sha: Annotated[ 

223 str, typer.Option(envvar="COVERED_COMMIT_SHA", help="Git commit SHA") 

224 ], 

225 coverage_threshold: Annotated[ 

226 float, 

227 typer.Option( 

228 envvar="COVERED_COVERAGE_THRESHOLD", 

229 help="Minimum coverage percentage to set success status", 

230 ), 

231 ] = 100.0, 

232 gh_token: Annotated[ 

233 str, 

234 typer.Option( 

235 envvar="COVERED_GH_TOKEN", help="GitHub token for setting commit status" 

236 ), 

237 ], 

238 is_default_branch: Annotated[ 

239 bool | None, 

240 typer.Option( 

241 envvar="COVERED_IS_DEFAULT_BRANCH", 

242 help="(DEPRECATED, use `--purge-cache` instead) Whether this is the default branch (enables cache invalidation)", 

243 show_default="False", 

244 ), 

245 ] = None, 

246 purge_cache: Annotated[ 

247 bool, 

248 typer.Option( 

249 envvar="COVERED_PURGE_CACHE", 

250 help="Whether to purge the GitHub Camo cache after upload (to prevent showing stale badge)", 

251 ), 

252 ] = False, 

253) -> None: 

254 """Upload a directory to a temporary site.""" 

255 

256 if is_default_branch is not None: 

257 warnings.warn( 

258 "The `--is-default-branch` option is deprecated and will be removed in a future release. Please use `--purge-cache` instead.", 

259 DeprecationWarning, 

260 ) 

261 

262 use_purge_cache = purge_cache or (is_default_branch is True) 

263 

264 if api_url.endswith("/"): 

265 raise typer.BadParameter("must not end with a slash", param_hint="--api-url") 

266 

267 asyncio.run( 

268 _main( 

269 directory=directory, 

270 api_url=api_url, 

271 api_key=api_key, 

272 concurrency=concurrency, 

273 repo_owner=repo_owner, 

274 repo_name=repo_name, 

275 commit_sha=commit_sha, 

276 coverage_threshold=coverage_threshold, 

277 gh_token=gh_token, 

278 purge_cache=use_purge_cache, 

279 ) 

280 ) 

281 

282 

283if __name__ == "__main__": 

284 app()