Coverage for backend / tests / test_badge.py: 100%
265 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 15:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 15:51 +0000
1from typing import Any
2from unittest.mock import AsyncMock
3import uuid
5import httpx
6import pytest
7import respx
8from fastapi.testclient import TestClient
9from redis import RedisError
11pytestmark = pytest.mark.respx(base_url="https://api.github.com")
13def get_commit(sha: str | None = None, skip_ci: bool = False) -> dict[str, Any]:
14 if sha is None:
15 sha = uuid.uuid4().hex
16 message = uuid.uuid4().hex
17 if skip_ci:
18 message += "\n\n[skip ci]"
19 return {"sha": sha, "commit": {"message": message}}
22COVERAGE_STATUS = {
23 "state": "success",
24 "description": "87% coverage",
25 "target_url": "https://example.com/coverage/report",
26 "context": "coverage/project",
27}
29NON_COVERAGE_STATUS = {
30 "state": "success",
31 "description": "example status",
32 "target_url": "https://example.com/",
33 "context": "other/status",
34}
36FAILED_COVERAGE_STATUS = {
37 "state": "failure",
38 "description": "42% coverage",
39 "target_url": "https://example.com/coverage/report",
40 "context": "coverage/project",
41}
44class TestBadge:
45 def test_returns_green_svg_for_successful_coverage(
46 self,
47 client: TestClient,
48 respx_mock: respx.MockRouter,
49 mock_redis: AsyncMock,
50 ):
51 mock_redis.get.return_value = None
52 commit = get_commit()
53 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
54 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
55 json=[COVERAGE_STATUS]
56 )
58 resp = client.get("/badge/owner/repo.svg")
60 assert resp.status_code == 200
61 assert resp.headers["content-type"] == "image/svg+xml"
62 assert "coverage: 87%" in resp.text
63 assert "#34D058" in resp.text # green
65 def test_returns_red_svg_for_failed_coverage(
66 self,
67 client: TestClient,
68 respx_mock: respx.MockRouter,
69 mock_redis: AsyncMock,
70 ):
71 mock_redis.get.return_value = None
72 commit = get_commit()
73 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
74 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
75 json=[FAILED_COVERAGE_STATUS]
76 )
78 resp = client.get("/badge/owner/repo.svg")
80 assert "coverage: 42%" in resp.text
81 assert "#CB2431" in resp.text # red
83 def test_returns_gray_badge_when_no_coverage_status(
84 self,
85 client: TestClient,
86 respx_mock: respx.MockRouter,
87 mock_redis: AsyncMock,
88 ):
89 mock_redis.get.return_value = None
90 commit = get_commit()
91 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
92 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(json=[])
94 resp = client.get("/badge/owner/repo.svg")
96 assert "coverage: ??%" in resp.text
97 assert "#9F9F9F" in resp.text # gray
99 def test_returns_gray_badge_when_no_commits(
100 self,
101 client: TestClient,
102 respx_mock: respx.MockRouter,
103 mock_redis: AsyncMock,
104 ):
105 mock_redis.get.return_value = None
106 respx_mock.get("/repos/owner/repo/commits").respond(json=[])
108 resp = client.get("/badge/owner/repo.svg")
110 assert "coverage: ??%" in resp.text
112 @pytest.mark.respx(base_url="https://api.github.com", assert_all_called=False)
113 def test_serves_cached_badge_from_redis(
114 self,
115 client: TestClient,
116 respx_mock: respx.MockRouter,
117 mock_redis: AsyncMock,
118 ):
119 mock_redis.get.return_value = b"<svg>cached</svg>"
120 commits_route = respx_mock.get("/repos/owner/repo/commits")
122 resp = client.get("/badge/owner/repo.svg")
124 assert resp.text == "<svg>cached</svg>"
125 assert not commits_route.called
127 def test_caches_generated_badge_in_redis(
128 self,
129 client: TestClient,
130 respx_mock: respx.MockRouter,
131 mock_redis: AsyncMock,
132 ):
133 mock_redis.get.return_value = None
134 commit = get_commit()
135 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
136 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
137 json=[COVERAGE_STATUS]
138 )
140 client.get("/badge/owner/repo.svg")
142 mock_redis.set.assert_awaited_once()
143 key, svg_bytes = mock_redis.set.call_args.args
144 assert key == "cache:badge:owner:repo"
145 assert b"coverage: 87%" in svg_bytes
146 ex = mock_redis.set.call_args.kwargs.get("ex")
147 assert ex == 60
149 def test_no_store_headers_for_github_camo(
150 self,
151 client: TestClient,
152 respx_mock: respx.MockRouter,
153 mock_redis: AsyncMock,
154 ):
155 mock_redis.get.return_value = None
156 commit = get_commit()
157 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
158 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
159 json=[COVERAGE_STATUS]
160 )
162 resp = client.get(
163 "/badge/owner/repo.svg",
164 headers={"user-agent": "github-camo/abc123"},
165 )
167 assert resp.headers["cache-control"] == "private, no-store"
168 assert resp.headers["cdn-cache-control"] == "no-store"
170 def test_public_cache_headers_for_regular_clients(
171 self,
172 client: TestClient,
173 respx_mock: respx.MockRouter,
174 mock_redis: AsyncMock,
175 ):
176 mock_redis.get.return_value = None
177 commit = get_commit()
178 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
179 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
180 json=[COVERAGE_STATUS]
181 )
183 resp = client.get("/badge/owner/repo.svg")
185 assert resp.headers["cache-control"] == "public, max-age=10"
186 assert resp.headers["cdn-cache-control"] == "max-age=10"
188 def test_redis_read_error_falls_through_to_github(
189 self,
190 client: TestClient,
191 respx_mock: respx.MockRouter,
192 mock_redis: AsyncMock,
193 ):
194 mock_redis.get.side_effect = RedisError("connection refused")
195 commit = get_commit()
196 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
197 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
198 json=[COVERAGE_STATUS]
199 )
201 resp = client.get("/badge/owner/repo.svg")
203 assert resp.status_code == 200
204 assert "coverage: 87%" in resp.text
206 def test_redis_write_error_still_returns_badge(
207 self,
208 client: TestClient,
209 respx_mock: respx.MockRouter,
210 mock_redis: AsyncMock,
211 ):
212 mock_redis.get.return_value = None
213 mock_redis.set.side_effect = RedisError("connection refused")
214 commit = get_commit()
215 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
216 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
217 json=[COVERAGE_STATUS]
218 )
220 resp = client.get("/badge/owner/repo.svg")
222 assert resp.status_code == 200
223 assert "coverage: 87%" in resp.text
225 @respx.mock(assert_all_called=False)
226 def test_skips_skip_ci_commits(
227 self,
228 client: TestClient,
229 respx_mock: respx.MockRouter,
230 mock_redis: AsyncMock,
231 ):
232 mock_redis.get.return_value = None
233 skip_ci_commit = get_commit(skip_ci=True)
234 commit_2 = get_commit()
236 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
237 json=[skip_ci_commit, commit_2]
238 )
239 commit_with_status_route = respx_mock.get(
240 f"/repos/owner/repo/statuses/{commit_2['sha']}"
241 ).respond(json=[COVERAGE_STATUS])
242 # The route below should not be called since the commit has "skip ci"
243 skip_ci_commit_status_route = respx_mock.get(
244 f"/repos/owner/repo/statuses/{skip_ci_commit['sha']}"
245 )
247 resp = client.get("/badge/owner/repo.svg")
249 assert "coverage: 87%" in resp.text
251 assert commits_route.called
252 assert commit_with_status_route.called
253 assert not skip_ci_commit_status_route.called
255 @respx.mock(assert_all_called=False)
256 def test_check_up_to_5_commits(
257 self,
258 client: TestClient,
259 respx_mock: respx.MockRouter,
260 mock_redis: AsyncMock,
261 ):
262 mock_redis.get.return_value = None
264 commits = [get_commit(skip_ci=True) for _ in range(4)]
265 commits.append(get_commit())
267 respx_mock.get("/repos/owner/repo/commits").respond(json=commits)
269 # Fifth commit has coverage status, the first 4 have "skip ci" and should be skipped
270 respx_mock.get(f"/repos/owner/repo/statuses/{commits[4]['sha']}").respond(
271 json=[COVERAGE_STATUS]
272 )
274 # The routes below should not be called since those commits have "skip ci"
275 paths = tuple(
276 f"/repos/owner/repo/statuses/{commit['sha']}" for commit in commits[:4]
277 )
278 skip_ci_commits_routes = respx_mock.route(path__in=paths)
280 resp = client.get("/badge/owner/repo.svg")
282 assert "coverage: 87%" in resp.text
284 assert not skip_ci_commits_routes.called
286 def test_commit_without_coverage_status(
287 self,
288 client: TestClient,
289 respx_mock: respx.MockRouter,
290 mock_redis: AsyncMock,
291 ):
292 mock_redis.get.return_value = None
293 commit = get_commit()
294 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
295 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
296 json=[NON_COVERAGE_STATUS]
297 )
299 resp = client.get("/badge/owner/repo.svg")
301 assert "coverage: ??%" in resp.text
303 @respx.mock(assert_all_called=False)
304 def test_stop_on_first_commit_without_skip_ci(
305 self,
306 client: TestClient,
307 respx_mock: respx.MockRouter,
308 mock_redis: AsyncMock,
309 ):
310 mock_redis.get.return_value = None
312 commit_without_coverage_status = get_commit()
313 commit_with_coverage_status = get_commit()
315 respx_mock.get("/repos/owner/repo/commits").respond(
316 json=[commit_without_coverage_status, commit_with_coverage_status]
317 )
318 respx_mock.get(
319 f"/repos/owner/repo/statuses/{commit_without_coverage_status['sha']}"
320 ).respond(json=[NON_COVERAGE_STATUS])
322 # The route below should not be called
323 commit_2_route = respx_mock.get(
324 f"/repos/owner/repo/statuses/{commit_with_coverage_status['sha']}"
325 )
327 resp = client.get("/badge/owner/repo.svg")
329 assert "coverage: ??%" in resp.text
331 assert not commit_2_route.called
333 def test_retry_on_github_api_failure(
334 self,
335 client: TestClient,
336 respx_mock: respx.MockRouter,
337 mock_redis: AsyncMock,
338 ):
339 mock_redis.get.return_value = None
340 commit = get_commit()
341 commits_route = respx_mock.get("/repos/owner/repo/commits").mock(
342 side_effect=[
343 httpx.ConnectError("boom"), # 1st call: network error
344 httpx.Response( # 2nd call: server error
345 500, json={"message": "server error"}
346 ),
347 httpx.Response(200, json=[commit]), # 3rd call: success
348 ]
349 )
351 status_route = respx_mock.get(
352 f"/repos/owner/repo/statuses/{commit['sha']}"
353 ).mock(
354 side_effect=[
355 httpx.ConnectError("boom"), # 1st call: network error
356 httpx.Response( # 2nd call: server error
357 500, json={"message": "server error"}
358 ),
359 httpx.Response(200, json=[COVERAGE_STATUS]), # 3rd call: success
360 ]
361 )
363 resp = client.get("/badge/owner/repo.svg")
365 assert resp.status_code == 200
366 assert "coverage: 87%" in resp.text
368 assert commits_route.call_count == 3
369 assert status_route.call_count == 3
371 def test_server_error_on_github_api_commit_route_failure(
372 self,
373 client: TestClient,
374 respx_mock: respx.MockRouter,
375 mock_redis: AsyncMock,
376 ):
377 mock_redis.get.return_value = None
378 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
379 status_code=500
380 )
382 with pytest.raises(httpx.HTTPStatusError):
383 client.get("/badge/owner/repo.svg")
385 assert commits_route.call_count == 3 # Retries up to 3 times on failure
387 def test_server_error_on_github_api_status_route_failure(
388 self,
389 client: TestClient,
390 respx_mock: respx.MockRouter,
391 mock_redis: AsyncMock,
392 ):
393 mock_redis.get.return_value = None
394 commit = get_commit()
395 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
396 status_route = respx_mock.get(
397 f"/repos/owner/repo/statuses/{commit['sha']}"
398 ).respond(status_code=500)
400 with pytest.raises(httpx.HTTPStatusError):
401 client.get("/badge/owner/repo.svg")
403 assert status_route.call_count == 3 # Retries up to 3 times on failure
405 def test_dont_write_cache_on_github_api_failure(
406 self,
407 client: TestClient,
408 respx_mock: respx.MockRouter,
409 mock_redis: AsyncMock,
410 ):
411 mock_redis.get.return_value = None
412 commit = get_commit()
413 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
414 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
415 status_code=500
416 )
418 with pytest.raises(httpx.HTTPStatusError):
419 client.get("/badge/owner/repo.svg")
421 assert not mock_redis.set.called # Don't cache failed responses
424class TestBadgeRedirect:
425 def test_redirects_to_target_url(
426 self,
427 client: TestClient,
428 respx_mock: respx.MockRouter,
429 ):
430 commit = get_commit()
431 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
432 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
433 json=[COVERAGE_STATUS]
434 )
436 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
438 assert resp.status_code == 307
439 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
441 def test_redirects_for_failed_coverage(
442 self,
443 client: TestClient,
444 respx_mock: respx.MockRouter,
445 ):
446 commit = get_commit()
447 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
448 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
449 json=[FAILED_COVERAGE_STATUS]
450 )
452 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
454 assert resp.status_code == 307
455 assert resp.headers["location"] == FAILED_COVERAGE_STATUS["target_url"]
457 def test_returns_404_when_no_coverage_status(
458 self,
459 client: TestClient,
460 respx_mock: respx.MockRouter,
461 ):
462 commit = get_commit()
463 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
464 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(json=[])
466 resp = client.get("/badge/redirect/owner/repo/")
468 assert resp.status_code == 404
470 def test_returns_404_when_no_commits(
471 self,
472 client: TestClient,
473 respx_mock: respx.MockRouter,
474 ):
475 respx_mock.get("/repos/owner/repo/commits").respond(json=[])
477 resp = client.get("/badge/redirect/owner/repo/")
479 assert resp.status_code == 404
481 def test_returns_404_when_commit_has_only_non_coverage_status(
482 self,
483 client: TestClient,
484 respx_mock: respx.MockRouter,
485 ):
486 commit = get_commit()
487 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
488 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
489 json=[NON_COVERAGE_STATUS]
490 )
492 resp = client.get("/badge/redirect/owner/repo/")
494 assert resp.status_code == 404
496 @respx.mock(assert_all_called=False)
497 def test_skips_skip_ci_commits(
498 self,
499 client: TestClient,
500 respx_mock: respx.MockRouter,
501 ):
502 skip_ci_commit = get_commit(skip_ci=True)
503 commit_2 = get_commit()
505 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
506 json=[skip_ci_commit, commit_2]
507 )
508 commit_with_status_route = respx_mock.get(
509 f"/repos/owner/repo/statuses/{commit_2['sha']}"
510 ).respond(json=[COVERAGE_STATUS])
511 # The route below should not be called since the commit has "skip ci"
512 skip_ci_commit_status_route = respx_mock.get(
513 f"/repos/owner/repo/statuses/{skip_ci_commit['sha']}"
514 )
516 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
518 assert resp.status_code == 307
519 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
521 assert commits_route.called
522 assert commit_with_status_route.called
523 assert not skip_ci_commit_status_route.called
525 @respx.mock(assert_all_called=False)
526 def test_check_up_to_5_commits(
527 self,
528 client: TestClient,
529 respx_mock: respx.MockRouter,
530 ):
531 commits = [get_commit(skip_ci=True) for _ in range(4)]
532 commits.append(get_commit())
534 respx_mock.get("/repos/owner/repo/commits").respond(json=commits)
536 # Fifth commit has coverage status, the first 4 have "skip ci" and should be skipped
537 respx_mock.get(f"/repos/owner/repo/statuses/{commits[4]['sha']}").respond(
538 json=[COVERAGE_STATUS]
539 )
541 # The routes below should not be called since those commits have "skip ci"
542 paths = tuple(
543 f"/repos/owner/repo/statuses/{commit['sha']}" for commit in commits[:4]
544 )
545 skip_ci_commits_routes = respx_mock.route(path__in=paths)
547 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
549 assert resp.status_code == 307
550 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
552 assert not skip_ci_commits_routes.called
554 @respx.mock(assert_all_called=False)
555 def test_stop_on_first_commit_without_skip_ci(
556 self,
557 client: TestClient,
558 respx_mock: respx.MockRouter,
559 ):
560 commit_without_coverage_status = get_commit()
561 commit_with_coverage_status = get_commit()
563 respx_mock.get("/repos/owner/repo/commits").respond(
564 json=[commit_without_coverage_status, commit_with_coverage_status]
565 )
566 respx_mock.get(
567 f"/repos/owner/repo/statuses/{commit_without_coverage_status['sha']}"
568 ).respond(json=[NON_COVERAGE_STATUS])
570 # The route below should not be called
571 commit_2_route = respx_mock.get(
572 f"/repos/owner/repo/statuses/{commit_with_coverage_status['sha']}"
573 )
575 resp = client.get("/badge/redirect/owner/repo/")
577 assert resp.status_code == 404
578 assert not commit_2_route.called
580 def test_retry_on_github_api_failure(
581 self,
582 client: TestClient,
583 respx_mock: respx.MockRouter,
584 ):
585 commit = get_commit()
586 commits_route = respx_mock.get("/repos/owner/repo/commits").mock(
587 side_effect=[
588 httpx.ConnectError("boom"), # 1st call: network error
589 httpx.Response( # 2nd call: server error
590 500, json={"message": "server error"}
591 ),
592 httpx.Response(200, json=[commit]), # 3rd call: success
593 ]
594 )
596 status_route = respx_mock.get(
597 f"/repos/owner/repo/statuses/{commit['sha']}"
598 ).mock(
599 side_effect=[
600 httpx.ConnectError("boom"), # 1st call: network error
601 httpx.Response( # 2nd call: server error
602 500, json={"message": "server error"}
603 ),
604 httpx.Response(200, json=[COVERAGE_STATUS]), # 3rd call: success
605 ]
606 )
608 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
610 assert resp.status_code == 307
611 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
613 assert commits_route.call_count == 3
614 assert status_route.call_count == 3
616 def test_server_error_on_github_api_commit_route_failure(
617 self,
618 client: TestClient,
619 respx_mock: respx.MockRouter,
620 ):
621 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
622 status_code=500
623 )
625 with pytest.raises(httpx.HTTPStatusError):
626 client.get("/badge/redirect/owner/repo/")
628 assert commits_route.call_count == 3 # Retries up to 3 times on failure
630 def test_server_error_on_github_api_status_route_failure(
631 self,
632 client: TestClient,
633 respx_mock: respx.MockRouter,
634 ):
635 commit = get_commit()
636 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
637 status_route = respx_mock.get(
638 f"/repos/owner/repo/statuses/{commit['sha']}"
639 ).respond(status_code=500)
641 with pytest.raises(httpx.HTTPStatusError):
642 client.get("/badge/redirect/owner/repo/")
644 assert status_route.call_count == 3 # Retries up to 3 times on failure