Coverage for backend / app / utils / aws_storage.py: 100%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 15:51 +0000

1import json 

2from dataclasses import dataclass 

3from uuid import uuid4 

4from contextlib import AsyncExitStack 

5 

6from botocore.exceptions import ClientError 

7from pydantic import SecretStr 

8from aiobotocore.session import get_session 

9from types_aiobotocore_s3 import S3Client as AIOS3Client 

10 

11 

12class AWSStorageError(Exception): 

13 pass 

14 

15 

16@dataclass 

17class UploadSession: 

18 site_id: str 

19 bucket: str 

20 region: str 

21 access_key_id: str 

22 secret_access_key: str 

23 session_token: str 

24 

25 

26class AWSStorage: 

27 def __init__( 

28 self, 

29 access_key_id: str, 

30 secret_access_key: SecretStr, 

31 bucket: str, 

32 region: str, 

33 upload_role_arn: str, 

34 ): 

35 self._session = get_session() 

36 self._access_key_id = access_key_id 

37 self._secret_access_key = secret_access_key 

38 self._bucket = bucket 

39 self._region = region 

40 self._upload_role_arn = upload_role_arn 

41 self._exit_stack = AsyncExitStack() 

42 self._client: AIOS3Client | None = None 

43 

44 async def __aenter__(self): 

45 if self._client is not None: 

46 raise AWSStorageError("Client already initialized") 

47 

48 client = self._session.create_client( 

49 "s3", 

50 aws_access_key_id=self._access_key_id, 

51 aws_secret_access_key=self._secret_access_key.get_secret_value(), 

52 region_name=self._region, 

53 ) 

54 self._client = await self._exit_stack.enter_async_context(client) 

55 return self 

56 

57 async def __aexit__(self, exc_type, exc_val, exc_tb): 

58 await self._exit_stack.aclose() 

59 self._client = None 

60 

61 async def _generate_site_id(self) -> str: 

62 if self._client is None: 

63 raise AWSStorageError("Client not initialized") 

64 for _ in range(3): 

65 site_id = uuid4().hex[:12] 

66 full_key = f"sites/{site_id}/.keep" 

67 try: 

68 await self._client.put_object( 

69 Bucket=self._bucket, 

70 Key=full_key, 

71 Body=b"", 

72 IfNoneMatch="*", 

73 ) 

74 return site_id 

75 except ClientError as e: 

76 if e.response.get("Error", {}).get("Code") == "PreconditionFailed": 

77 continue 

78 raise AWSStorageError(f"Failed to create site directory: {e}") 

79 raise AWSStorageError("Failed to generate unique site ID after multiple attempts") 

80 

81 async def create_upload_session(self) -> UploadSession: 

82 site_id = await self._generate_site_id() 

83 

84 session_policy = json.dumps({ 

85 "Version": "2012-10-17", 

86 "Statement": [{ 

87 "Effect": "Allow", 

88 "Action": "s3:PutObject", 

89 "Resource": f"arn:aws:s3:::{self._bucket}/sites/{site_id}/*", 

90 }], 

91 }) 

92 

93 async with self._session.create_client( 

94 "sts", 

95 aws_access_key_id=self._access_key_id, 

96 aws_secret_access_key=self._secret_access_key.get_secret_value(), 

97 region_name=self._region, 

98 ) as sts_client: 

99 response = await sts_client.assume_role( 

100 RoleArn=self._upload_role_arn, 

101 RoleSessionName=f"upload-{site_id}", 

102 DurationSeconds=3600, 

103 Policy=session_policy, 

104 ) 

105 

106 credentials = response["Credentials"] 

107 return UploadSession( 

108 site_id=site_id, 

109 bucket=self._bucket, 

110 region=self._region, 

111 access_key_id=credentials["AccessKeyId"], 

112 secret_access_key=credentials["SecretAccessKey"], 

113 session_token=credentials["SessionToken"], 

114 ) 

115 

116 async def get_file(self, site_id: str, key: str) -> bytes: 

117 if self._client is None: 

118 raise AWSStorageError("Client not initialized") 

119 full_key = f"sites/{site_id}/{key}" 

120 try: 

121 res = await self._client.get_object(Bucket=self._bucket, Key=full_key) 

122 return await res["Body"].read() 

123 except ClientError as e: 

124 if e.response.get("Error", {}).get("Code") == "NoSuchKey": 

125 raise AWSStorageError(f"File not found: {full_key}") 

126 raise AWSStorageError(f"Failed to get file: {e}") # pragma: no cover