Coverage for backend / tests / test_badge.py: 100%
274 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-19 09:10 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-19 09:10 +0000
1from typing import Any
2from unittest.mock import AsyncMock
3import uuid
5import httpx
6import pytest
7import respx
8from fastapi.testclient import TestClient
9from redis import RedisError
11pytestmark = pytest.mark.respx(base_url="https://api.github.com")
14def get_commit(sha: str | None = None, skip_ci: bool = False) -> dict[str, Any]:
15 if sha is None:
16 sha = uuid.uuid4().hex
17 message = uuid.uuid4().hex
18 if skip_ci:
19 message += "\n\n[skip ci]"
20 return {"sha": sha, "commit": {"message": message}}
23COVERAGE_STATUS = {
24 "state": "success",
25 "description": "87% coverage",
26 "target_url": "https://example.com/coverage/report",
27 "context": "coverage/project",
28}
30NON_COVERAGE_STATUS = {
31 "state": "success",
32 "description": "example status",
33 "target_url": "https://example.com/",
34 "context": "other/status",
35}
37FAILED_COVERAGE_STATUS = {
38 "state": "failure",
39 "description": "42% coverage",
40 "target_url": "https://example.com/coverage/report",
41 "context": "coverage/project",
42}
45class TestBadge:
46 def test_returns_green_svg_for_successful_coverage(
47 self,
48 client: TestClient,
49 respx_mock: respx.MockRouter,
50 mock_redis: AsyncMock,
51 ):
52 mock_redis.get.return_value = None
53 commit = get_commit()
54 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
55 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
56 json=[COVERAGE_STATUS]
57 )
59 resp = client.get("/badge/owner/repo.svg")
61 assert resp.status_code == 200
62 assert resp.headers["content-type"] == "image/svg+xml"
63 assert "coverage: 87%" in resp.text
64 assert "#34D058" in resp.text # green
66 def test_returns_red_svg_for_failed_coverage(
67 self,
68 client: TestClient,
69 respx_mock: respx.MockRouter,
70 mock_redis: AsyncMock,
71 ):
72 mock_redis.get.return_value = None
73 commit = get_commit()
74 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
75 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
76 json=[FAILED_COVERAGE_STATUS]
77 )
79 resp = client.get("/badge/owner/repo.svg")
81 assert "coverage: 42%" in resp.text
82 assert "#CB2431" in resp.text # red
84 def test_returns_gray_badge_when_no_coverage_status(
85 self,
86 client: TestClient,
87 respx_mock: respx.MockRouter,
88 mock_redis: AsyncMock,
89 ):
90 mock_redis.get.return_value = None
91 commit = get_commit()
92 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
93 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(json=[])
95 resp = client.get("/badge/owner/repo.svg")
97 assert "coverage: ??%" in resp.text
98 assert "#9F9F9F" in resp.text # gray
100 def test_returns_gray_badge_when_no_commits(
101 self,
102 client: TestClient,
103 respx_mock: respx.MockRouter,
104 mock_redis: AsyncMock,
105 ):
106 mock_redis.get.return_value = None
107 respx_mock.get("/repos/owner/repo/commits").respond(json=[])
109 resp = client.get("/badge/owner/repo.svg")
111 assert "coverage: ??%" in resp.text
113 @pytest.mark.respx(base_url="https://api.github.com", assert_all_called=False)
114 def test_serves_cached_badge_from_redis(
115 self,
116 client: TestClient,
117 respx_mock: respx.MockRouter,
118 mock_redis: AsyncMock,
119 ):
120 mock_redis.get.return_value = b"<svg>cached</svg>"
121 commits_route = respx_mock.get("/repos/owner/repo/commits")
123 resp = client.get("/badge/owner/repo.svg")
125 assert resp.text == "<svg>cached</svg>"
126 assert not commits_route.called
128 def test_caches_generated_badge_in_redis(
129 self,
130 client: TestClient,
131 respx_mock: respx.MockRouter,
132 mock_redis: AsyncMock,
133 ):
134 mock_redis.get.return_value = None
135 commit = get_commit()
136 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
137 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
138 json=[COVERAGE_STATUS]
139 )
141 client.get("/badge/owner/repo.svg")
143 mock_redis.set.assert_awaited_once()
144 key, svg_bytes = mock_redis.set.call_args.args
145 assert key == "cache:badge:owner:repo"
146 assert b"coverage: 87%" in svg_bytes
147 ex = mock_redis.set.call_args.kwargs.get("ex")
148 assert ex == 60
150 def test_no_store_headers_for_github_camo(
151 self,
152 client: TestClient,
153 respx_mock: respx.MockRouter,
154 mock_redis: AsyncMock,
155 ):
156 mock_redis.get.return_value = None
157 commit = get_commit()
158 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
159 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
160 json=[COVERAGE_STATUS]
161 )
163 resp = client.get(
164 "/badge/owner/repo.svg",
165 headers={"user-agent": "github-camo/abc123"},
166 )
168 assert resp.headers["cache-control"] == "private, no-store"
169 assert resp.headers["cdn-cache-control"] == "no-store"
171 def test_public_cache_headers_for_regular_clients(
172 self,
173 client: TestClient,
174 respx_mock: respx.MockRouter,
175 mock_redis: AsyncMock,
176 ):
177 mock_redis.get.return_value = None
178 commit = get_commit()
179 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
180 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
181 json=[COVERAGE_STATUS]
182 )
184 resp = client.get("/badge/owner/repo.svg")
186 assert resp.headers["cache-control"] == "public, max-age=10"
187 assert resp.headers["cdn-cache-control"] == "max-age=10"
189 def test_redis_read_error_falls_through_to_github(
190 self,
191 client: TestClient,
192 respx_mock: respx.MockRouter,
193 mock_redis: AsyncMock,
194 ):
195 mock_redis.get.side_effect = RedisError("connection refused")
196 commit = get_commit()
197 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
198 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
199 json=[COVERAGE_STATUS]
200 )
202 resp = client.get("/badge/owner/repo.svg")
204 assert resp.status_code == 200
205 assert "coverage: 87%" in resp.text
207 def test_redis_write_error_still_returns_badge(
208 self,
209 client: TestClient,
210 respx_mock: respx.MockRouter,
211 mock_redis: AsyncMock,
212 ):
213 mock_redis.get.return_value = None
214 mock_redis.set.side_effect = RedisError("connection refused")
215 commit = get_commit()
216 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
217 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
218 json=[COVERAGE_STATUS]
219 )
221 resp = client.get("/badge/owner/repo.svg")
223 assert resp.status_code == 200
224 assert "coverage: 87%" in resp.text
226 @respx.mock(assert_all_called=False)
227 def test_skips_skip_ci_commits(
228 self,
229 client: TestClient,
230 respx_mock: respx.MockRouter,
231 mock_redis: AsyncMock,
232 ):
233 mock_redis.get.return_value = None
234 skip_ci_commit = get_commit(skip_ci=True)
235 commit_2 = get_commit()
237 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
238 json=[skip_ci_commit, commit_2]
239 )
240 commit_with_status_route = respx_mock.get(
241 f"/repos/owner/repo/statuses/{commit_2['sha']}"
242 ).respond(json=[COVERAGE_STATUS])
243 # The route below should not be called since the commit has "skip ci"
244 skip_ci_commit_status_route = respx_mock.get(
245 f"/repos/owner/repo/statuses/{skip_ci_commit['sha']}"
246 )
248 resp = client.get("/badge/owner/repo.svg")
250 assert "coverage: 87%" in resp.text
252 assert commits_route.called
253 assert commit_with_status_route.called
254 assert not skip_ci_commit_status_route.called
256 @respx.mock(assert_all_called=False)
257 def test_check_up_to_5_commits(
258 self,
259 client: TestClient,
260 respx_mock: respx.MockRouter,
261 mock_redis: AsyncMock,
262 ):
263 mock_redis.get.return_value = None
265 commits = [get_commit(skip_ci=True) for _ in range(4)]
266 commits.append(get_commit())
268 respx_mock.get("/repos/owner/repo/commits").respond(json=commits)
270 # Fifth commit has coverage status, the first 4 have "skip ci" and should be skipped
271 respx_mock.get(f"/repos/owner/repo/statuses/{commits[4]['sha']}").respond(
272 json=[COVERAGE_STATUS]
273 )
275 # The routes below should not be called since those commits have "skip ci"
276 paths = tuple(
277 f"/repos/owner/repo/statuses/{commit['sha']}" for commit in commits[:4]
278 )
279 skip_ci_commits_routes = respx_mock.route(path__in=paths)
281 resp = client.get("/badge/owner/repo.svg")
283 assert "coverage: 87%" in resp.text
285 assert not skip_ci_commits_routes.called
287 def test_commit_without_coverage_status(
288 self,
289 client: TestClient,
290 respx_mock: respx.MockRouter,
291 mock_redis: AsyncMock,
292 ):
293 mock_redis.get.return_value = None
294 commit = get_commit()
295 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
296 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
297 json=[NON_COVERAGE_STATUS]
298 )
300 resp = client.get("/badge/owner/repo.svg")
302 assert "coverage: ??%" in resp.text
304 @respx.mock(assert_all_called=False)
305 def test_stop_on_first_commit_without_skip_ci(
306 self,
307 client: TestClient,
308 respx_mock: respx.MockRouter,
309 mock_redis: AsyncMock,
310 ):
311 mock_redis.get.return_value = None
313 commit_without_coverage_status = get_commit()
314 commit_with_coverage_status = get_commit()
316 respx_mock.get("/repos/owner/repo/commits").respond(
317 json=[commit_without_coverage_status, commit_with_coverage_status]
318 )
319 respx_mock.get(
320 f"/repos/owner/repo/statuses/{commit_without_coverage_status['sha']}"
321 ).respond(json=[NON_COVERAGE_STATUS])
323 # The route below should not be called
324 commit_2_route = respx_mock.get(
325 f"/repos/owner/repo/statuses/{commit_with_coverage_status['sha']}"
326 )
328 resp = client.get("/badge/owner/repo.svg")
330 assert "coverage: ??%" in resp.text
332 assert not commit_2_route.called
334 def test_retry_on_github_api_failure(
335 self,
336 client: TestClient,
337 respx_mock: respx.MockRouter,
338 mock_redis: AsyncMock,
339 ):
340 mock_redis.get.return_value = None
341 commit = get_commit()
342 commits_route = respx_mock.get("/repos/owner/repo/commits").mock(
343 side_effect=[
344 httpx.ConnectError("boom"), # 1st call: network error
345 httpx.Response( # 2nd call: server error
346 500, json={"message": "server error"}
347 ),
348 httpx.Response(200, json=[commit]), # 3rd call: success
349 ]
350 )
352 status_route = respx_mock.get(
353 f"/repos/owner/repo/statuses/{commit['sha']}"
354 ).mock(
355 side_effect=[
356 httpx.ConnectError("boom"), # 1st call: network error
357 httpx.Response( # 2nd call: server error
358 500, json={"message": "server error"}
359 ),
360 httpx.Response(200, json=[COVERAGE_STATUS]), # 3rd call: success
361 ]
362 )
364 resp = client.get("/badge/owner/repo.svg")
366 assert resp.status_code == 200
367 assert "coverage: 87%" in resp.text
369 assert commits_route.call_count == 3
370 assert status_route.call_count == 3
372 def test_server_error_on_github_api_commit_route_failure(
373 self,
374 client: TestClient,
375 respx_mock: respx.MockRouter,
376 mock_redis: AsyncMock,
377 ):
378 mock_redis.get.return_value = None
379 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
380 status_code=500
381 )
383 with pytest.raises(httpx.HTTPStatusError):
384 client.get("/badge/owner/repo.svg")
386 assert commits_route.call_count == 3 # Retries up to 3 times on failure
388 def test_server_error_on_github_api_status_route_failure(
389 self,
390 client: TestClient,
391 respx_mock: respx.MockRouter,
392 mock_redis: AsyncMock,
393 ):
394 mock_redis.get.return_value = None
395 commit = get_commit()
396 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
397 status_route = respx_mock.get(
398 f"/repos/owner/repo/statuses/{commit['sha']}"
399 ).respond(status_code=500)
401 with pytest.raises(httpx.HTTPStatusError):
402 client.get("/badge/owner/repo.svg")
404 assert status_route.call_count == 3 # Retries up to 3 times on failure
406 def test_dont_write_cache_on_github_api_failure(
407 self,
408 client: TestClient,
409 respx_mock: respx.MockRouter,
410 mock_redis: AsyncMock,
411 ):
412 mock_redis.get.return_value = None
413 commit = get_commit()
414 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
415 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
416 status_code=500
417 )
419 with pytest.raises(httpx.HTTPStatusError):
420 client.get("/badge/owner/repo.svg")
422 assert not mock_redis.set.called # Don't cache failed responses
424 def test_get_badge_without_redis(
425 self,
426 client: TestClient,
427 respx_mock: respx.MockRouter,
428 mock_redis: AsyncMock,
429 disable_redis: None,
430 ):
431 # Test that the badge endpoint works even if Redis is not configured
432 commit = get_commit()
433 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
434 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
435 json=[COVERAGE_STATUS]
436 )
438 resp = client.get("/badge/owner/repo.svg")
440 assert resp.status_code == 200
441 assert "coverage: 87%" in resp.text
442 mock_redis.get.assert_not_called()
443 mock_redis.set.assert_not_called()
446class TestBadgeRedirect:
447 def test_redirects_to_target_url(
448 self,
449 client: TestClient,
450 respx_mock: respx.MockRouter,
451 ):
452 commit = get_commit()
453 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
454 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
455 json=[COVERAGE_STATUS]
456 )
458 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
460 assert resp.status_code == 307
461 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
463 def test_redirects_for_failed_coverage(
464 self,
465 client: TestClient,
466 respx_mock: respx.MockRouter,
467 ):
468 commit = get_commit()
469 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
470 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
471 json=[FAILED_COVERAGE_STATUS]
472 )
474 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
476 assert resp.status_code == 307
477 assert resp.headers["location"] == FAILED_COVERAGE_STATUS["target_url"]
479 def test_returns_404_when_no_coverage_status(
480 self,
481 client: TestClient,
482 respx_mock: respx.MockRouter,
483 ):
484 commit = get_commit()
485 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
486 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(json=[])
488 resp = client.get("/badge/redirect/owner/repo/")
490 assert resp.status_code == 404
492 def test_returns_404_when_no_commits(
493 self,
494 client: TestClient,
495 respx_mock: respx.MockRouter,
496 ):
497 respx_mock.get("/repos/owner/repo/commits").respond(json=[])
499 resp = client.get("/badge/redirect/owner/repo/")
501 assert resp.status_code == 404
503 def test_returns_404_when_commit_has_only_non_coverage_status(
504 self,
505 client: TestClient,
506 respx_mock: respx.MockRouter,
507 ):
508 commit = get_commit()
509 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
510 respx_mock.get(f"/repos/owner/repo/statuses/{commit['sha']}").respond(
511 json=[NON_COVERAGE_STATUS]
512 )
514 resp = client.get("/badge/redirect/owner/repo/")
516 assert resp.status_code == 404
518 @respx.mock(assert_all_called=False)
519 def test_skips_skip_ci_commits(
520 self,
521 client: TestClient,
522 respx_mock: respx.MockRouter,
523 ):
524 skip_ci_commit = get_commit(skip_ci=True)
525 commit_2 = get_commit()
527 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
528 json=[skip_ci_commit, commit_2]
529 )
530 commit_with_status_route = respx_mock.get(
531 f"/repos/owner/repo/statuses/{commit_2['sha']}"
532 ).respond(json=[COVERAGE_STATUS])
533 # The route below should not be called since the commit has "skip ci"
534 skip_ci_commit_status_route = respx_mock.get(
535 f"/repos/owner/repo/statuses/{skip_ci_commit['sha']}"
536 )
538 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
540 assert resp.status_code == 307
541 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
543 assert commits_route.called
544 assert commit_with_status_route.called
545 assert not skip_ci_commit_status_route.called
547 @respx.mock(assert_all_called=False)
548 def test_check_up_to_5_commits(
549 self,
550 client: TestClient,
551 respx_mock: respx.MockRouter,
552 ):
553 commits = [get_commit(skip_ci=True) for _ in range(4)]
554 commits.append(get_commit())
556 respx_mock.get("/repos/owner/repo/commits").respond(json=commits)
558 # Fifth commit has coverage status, the first 4 have "skip ci" and should be skipped
559 respx_mock.get(f"/repos/owner/repo/statuses/{commits[4]['sha']}").respond(
560 json=[COVERAGE_STATUS]
561 )
563 # The routes below should not be called since those commits have "skip ci"
564 paths = tuple(
565 f"/repos/owner/repo/statuses/{commit['sha']}" for commit in commits[:4]
566 )
567 skip_ci_commits_routes = respx_mock.route(path__in=paths)
569 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
571 assert resp.status_code == 307
572 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
574 assert not skip_ci_commits_routes.called
576 @respx.mock(assert_all_called=False)
577 def test_stop_on_first_commit_without_skip_ci(
578 self,
579 client: TestClient,
580 respx_mock: respx.MockRouter,
581 ):
582 commit_without_coverage_status = get_commit()
583 commit_with_coverage_status = get_commit()
585 respx_mock.get("/repos/owner/repo/commits").respond(
586 json=[commit_without_coverage_status, commit_with_coverage_status]
587 )
588 respx_mock.get(
589 f"/repos/owner/repo/statuses/{commit_without_coverage_status['sha']}"
590 ).respond(json=[NON_COVERAGE_STATUS])
592 # The route below should not be called
593 commit_2_route = respx_mock.get(
594 f"/repos/owner/repo/statuses/{commit_with_coverage_status['sha']}"
595 )
597 resp = client.get("/badge/redirect/owner/repo/")
599 assert resp.status_code == 404
600 assert not commit_2_route.called
602 def test_retry_on_github_api_failure(
603 self,
604 client: TestClient,
605 respx_mock: respx.MockRouter,
606 ):
607 commit = get_commit()
608 commits_route = respx_mock.get("/repos/owner/repo/commits").mock(
609 side_effect=[
610 httpx.ConnectError("boom"), # 1st call: network error
611 httpx.Response( # 2nd call: server error
612 500, json={"message": "server error"}
613 ),
614 httpx.Response(200, json=[commit]), # 3rd call: success
615 ]
616 )
618 status_route = respx_mock.get(
619 f"/repos/owner/repo/statuses/{commit['sha']}"
620 ).mock(
621 side_effect=[
622 httpx.ConnectError("boom"), # 1st call: network error
623 httpx.Response( # 2nd call: server error
624 500, json={"message": "server error"}
625 ),
626 httpx.Response(200, json=[COVERAGE_STATUS]), # 3rd call: success
627 ]
628 )
630 resp = client.get("/badge/redirect/owner/repo/", follow_redirects=False)
632 assert resp.status_code == 307
633 assert resp.headers["location"] == COVERAGE_STATUS["target_url"]
635 assert commits_route.call_count == 3
636 assert status_route.call_count == 3
638 def test_server_error_on_github_api_commit_route_failure(
639 self,
640 client: TestClient,
641 respx_mock: respx.MockRouter,
642 ):
643 commits_route = respx_mock.get("/repos/owner/repo/commits").respond(
644 status_code=500
645 )
647 with pytest.raises(httpx.HTTPStatusError):
648 client.get("/badge/redirect/owner/repo/")
650 assert commits_route.call_count == 3 # Retries up to 3 times on failure
652 def test_server_error_on_github_api_status_route_failure(
653 self,
654 client: TestClient,
655 respx_mock: respx.MockRouter,
656 ):
657 commit = get_commit()
658 respx_mock.get("/repos/owner/repo/commits").respond(json=[commit])
659 status_route = respx_mock.get(
660 f"/repos/owner/repo/statuses/{commit['sha']}"
661 ).respond(status_code=500)
663 with pytest.raises(httpx.HTTPStatusError):
664 client.get("/badge/redirect/owner/repo/")
666 assert status_route.call_count == 3 # Retries up to 3 times on failure