caselawclient.models.utilities.aws
1import datetime 2import json 3import logging 4import uuid 5from collections.abc import Callable 6from typing import Any, Literal, Optional, TypedDict, overload 7 8import boto3 9import botocore.client 10import environ 11from mypy_boto3_s3.client import S3Client 12from mypy_boto3_s3.type_defs import CopySourceTypeDef, ObjectIdentifierTypeDef 13from mypy_boto3_sns.client import SNSClient 14from mypy_boto3_sns.type_defs import MessageAttributeValueTypeDef 15from typing_extensions import NotRequired 16 17from caselawclient.types import DocumentURIString 18 19env = environ.Env() 20 21 22class S3PrefixString(str): 23 def __new__(cls, content: str) -> "S3PrefixString": 24 if content[-1] != "/": 25 raise RuntimeError("S3 Prefixes must end in / so they behave like directories") 26 return str.__new__(cls, content) 27 28 29class ParserInstructionsMetadataDict(TypedDict): 30 name: Optional[str] 31 cite: Optional[str] 32 court: Optional[str] 33 date: Optional[str] 34 uri: Optional[str] 35 36 37class ParserInstructionsDict(TypedDict): 38 documentType: NotRequired[Optional[str]] 39 metadata: NotRequired[ParserInstructionsMetadataDict] 40 41 42@overload 43def create_aws_client(service: Literal["s3"]) -> S3Client: ... 44 45 46@overload 47def create_aws_client(service: Literal["sns"]) -> SNSClient: ... 48 49 50def create_aws_client(service: Literal["s3", "sns"]) -> Any: 51 aws = boto3.session.Session() 52 return aws.client( 53 service, 54 region_name=env("PRIVATE_ASSET_BUCKET_REGION", default=None), 55 config=botocore.client.Config(signature_version="s3v4"), 56 ) 57 58 59def create_s3_client() -> S3Client: 60 return create_aws_client("s3") 61 62 63def create_sns_client() -> SNSClient: 64 return create_aws_client("sns") 65 66 67def uri_for_s3(uri: DocumentURIString) -> S3PrefixString: 68 """An S3 Prefix must end with / to avoid uksc/2004/1 matching uksc/2004/1000""" 69 return S3PrefixString(uri + "/") 70 71 72def generate_signed_asset_url( 73 key: str, force_download: bool = False, content_disposition_filename: None | str = None 74) -> str: 75 # If there isn't a PRIVATE_ASSET_BUCKET, don't try to get the bucket. 76 # This helps local environment setup where we don't use S3. 77 if not content_disposition_filename: 78 content_disposition_filename = key.rpartition("/")[-1] 79 bucket = env("PRIVATE_ASSET_BUCKET", None) 80 if not bucket: 81 return "" 82 83 client = create_s3_client() 84 params = {"Bucket": bucket, "Key": key} 85 if force_download: 86 params["ResponseContentDisposition"] = f"attachment;filename={content_disposition_filename}" 87 88 return str( 89 client.generate_presigned_url( 90 "get_object", 91 Params=params, 92 ), 93 ) 94 95 96def check_docx_exists(uri: DocumentURIString) -> bool: 97 """Does the docx for a document URI actually exist?""" 98 bucket = env("PRIVATE_ASSET_BUCKET", None) 99 s3_key = generate_docx_key(uri) 100 client = create_s3_client() 101 try: 102 client.head_object(Bucket=bucket, Key=s3_key) 103 return True 104 except botocore.exceptions.ClientError as e: 105 if e.response["Error"]["Code"] == "404": 106 return False 107 if e.response["Error"]["Code"] == "403": 108 e.add_note("403 on reading {s3_key}") 109 raise 110 111 112def generate_docx_key(uri: DocumentURIString) -> str: 113 """from a canonical caselaw URI (eat/2022/1) return the S3 key of the associated docx""" 114 return f"{uri}/{uri.replace('/', '_')}.docx" 115 116 117def generate_docx_url(uri: DocumentURIString, force_download: bool = True) -> str: 118 """from a canonical caselaw URI (eat/2022/1) return a signed S3 link for the front end""" 119 return generate_signed_asset_url(generate_docx_key(uri), force_download=force_download) 120 121 122def generate_pdf_url(uri: DocumentURIString, force_download: bool = False) -> str: 123 key = f"{uri}/{uri.replace('/', '_')}.pdf" 124 125 return generate_signed_asset_url(key, force_download=force_download) 126 127 128def delete_from_bucket(uri: DocumentURIString, bucket: str) -> None: 129 delete_some_from_bucket(uri=uri, bucket=bucket, filter=lambda x: True) 130 131 132def delete_some_from_bucket( 133 uri: DocumentURIString, bucket: str, filter: Callable[[ObjectIdentifierTypeDef], bool] 134) -> None: 135 client = create_s3_client() 136 response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri)) 137 138 if response.get("Contents"): 139 objects_to_maybe_delete: list[ObjectIdentifierTypeDef] = [ 140 {"Key": obj["Key"]} for obj in response.get("Contents", []) 141 ] 142 objects_to_delete = [obj for obj in objects_to_maybe_delete if filter(obj)] 143 client.delete_objects( 144 Bucket=bucket, 145 Delete={ 146 "Objects": objects_to_delete, 147 }, 148 ) 149 150 151def delete_non_targz_from_bucket(uri: DocumentURIString, bucket: str) -> None: 152 delete_some_from_bucket(uri=uri, bucket=bucket, filter=lambda x: not x["Key"].endswith(".tar.gz")) 153 154 155def publish_documents(uri: DocumentURIString) -> None: 156 """ 157 Copy assets from the unpublished bucket to the published one. 158 Don't copy parser logs and package tar gz. 159 TODO: consider refactoring with copy_assets 160 """ 161 client = create_s3_client() 162 163 public_bucket = env("PUBLIC_ASSET_BUCKET") 164 private_bucket = env("PRIVATE_ASSET_BUCKET") 165 166 response = client.list_objects(Bucket=private_bucket, Prefix=uri_for_s3(uri)) 167 168 for result in response.get("Contents", []): 169 print(f"Contemplating copying {result!r}") 170 key = str(result["Key"]) 171 172 if not key.endswith("parser.log") and not key.endswith(".tar.gz"): 173 source: CopySourceTypeDef = {"Bucket": private_bucket, "Key": key} 174 extra_args: dict[str, str] = {} 175 try: 176 print(f"Copying {key!r} from {private_bucket!r} to {public_bucket!r}") 177 client.copy(source, public_bucket, key, extra_args) 178 except botocore.client.ClientError as e: 179 logging.warning( 180 f"Unable to copy file {key} to new location {public_bucket}, error: {e}", 181 ) 182 183 184def unpublish_documents(uri: DocumentURIString) -> None: 185 delete_from_bucket(uri, env("PUBLIC_ASSET_BUCKET")) 186 187 188def delete_documents_from_private_bucket(uri: DocumentURIString) -> None: 189 delete_from_bucket(uri, env("PRIVATE_ASSET_BUCKET")) 190 191 192def announce_document_event(uri: DocumentURIString, status: str, enrich: bool = False) -> None: 193 client = create_sns_client() 194 195 message_attributes: dict[str, MessageAttributeValueTypeDef] = {} 196 message_attributes["update_type"] = { 197 "DataType": "String", 198 "StringValue": status, 199 } 200 message_attributes["uri_reference"] = { 201 "DataType": "String", 202 "StringValue": uri, 203 } 204 if enrich: 205 message_attributes["trigger_enrichment"] = { 206 "DataType": "String", 207 "StringValue": "1", 208 } 209 210 client.publish( 211 TopicArn=env("SNS_TOPIC"), # this is the ANNOUNCE SNS topic 212 Message=json.dumps({"uri_reference": uri, "status": status}), 213 Subject=f"Updated: {uri} {status}", 214 MessageAttributes=message_attributes, 215 ) 216 217 218def upload_asset_to_private_bucket(body: bytes, s3_key: str) -> None: 219 """Upload an asset to the private bucket.""" 220 bucket: str = env("PRIVATE_ASSET_BUCKET") 221 s3client = create_s3_client() 222 s3client.put_object(Body=body, Bucket=bucket, Key=s3_key, Tagging="pdfsource=custom-pdfs") 223 224 225def copy_assets(old_uri: DocumentURIString, new_uri: DocumentURIString) -> None: 226 """ 227 Copy *unpublished* assets from one path to another, 228 renaming DOCX and PDF files as appropriate. 229 """ 230 client = create_s3_client() 231 bucket = env("PRIVATE_ASSET_BUCKET") 232 response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(old_uri)) 233 234 for result in response.get("Contents", []): 235 old_key = str(result["Key"]) 236 new_key = build_new_key(old_key, new_uri) 237 if new_key is None: 238 continue 239 try: 240 source: CopySourceTypeDef = {"Bucket": bucket, "Key": old_key} 241 client.copy(source, bucket, new_key) 242 except botocore.client.ClientError as e: 243 logging.warning( 244 f"Unable to copy file {old_key} to new location {new_key}, error: {e}", 245 ) 246 247 248def are_unpublished_assets_clean(uri: DocumentURIString) -> bool: 249 """Returns true if all non-tar.gz assets in the relevant S3 bucket have been cleaned 250 (they have a DOCUMENT_PROCESSOR_VERSION tag) 251 Note: if there are no assets, then this returns true.""" 252 client = create_s3_client() 253 bucket = env("PRIVATE_ASSET_BUCKET") 254 response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri)) 255 for result in response.get("Contents", []): 256 file_key = str(result["Key"]) 257 # ignore original tar.gz files 258 if file_key.endswith(".tar.gz"): 259 continue 260 261 # check if assets are tagged as being processed by S3 262 tag_response = client.get_object_tagging(Bucket=bucket, Key=file_key) 263 if not (any(tag["Key"] == "DOCUMENT_PROCESSOR_VERSION" for tag in tag_response["TagSet"])): 264 return False 265 return True 266 267 268def build_new_key(old_key: str, new_uri: DocumentURIString) -> str: 269 """Ensure that DOCX and PDF filenames are modified to reflect their new home 270 as we get the name of the new S3 path""" 271 old_filename = old_key.rsplit("/", 1)[-1] 272 273 if old_filename.endswith(".docx") or old_filename.endswith(".pdf"): 274 new_filename = new_uri.replace("/", "_") 275 return f"{new_uri}/{new_filename}.{old_filename.split('.')[-1]}" 276 return f"{new_uri}/{old_filename}" 277 278 279def request_parse( 280 uri: DocumentURIString, 281 reference: Optional[str], 282 parser_instructions: Optional[ParserInstructionsDict] = None, 283) -> None: 284 client = create_sns_client() 285 286 if parser_instructions is None: 287 parser_instructions = ParserInstructionsDict({}) 288 289 message_to_send = { 290 "properties": { 291 "messageType": "uk.gov.nationalarchives.da.messages.request.courtdocument.parse.RequestCourtDocumentParse", 292 "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z"), 293 "function": "fcl-judgment-parse-request", 294 "producer": "FCL", 295 "executionId": str(uuid.uuid4()), 296 "parentExecutionId": None, 297 }, 298 "parameters": { 299 "s3Bucket": env("PRIVATE_ASSET_BUCKET"), 300 "s3Key": generate_docx_key(uri), 301 "reference": reference or f"FCL-{str(uuid.uuid4())[:-13]}", # uuid truncated at request of TRE 302 "originator": "FCL", 303 "parserInstructions": parser_instructions, 304 }, 305 } 306 307 client.publish( 308 TopicArn=env("REPARSE_SNS_TOPIC"), 309 Message=json.dumps(message_to_send), 310 Subject=f"Reparse request: {uri}", 311 )
23class S3PrefixString(str): 24 def __new__(cls, content: str) -> "S3PrefixString": 25 if content[-1] != "/": 26 raise RuntimeError("S3 Prefixes must end in / so they behave like directories") 27 return str.__new__(cls, content)
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
38class ParserInstructionsDict(TypedDict): 39 documentType: NotRequired[Optional[str]] 40 metadata: NotRequired[ParserInstructionsMetadataDict]
68def uri_for_s3(uri: DocumentURIString) -> S3PrefixString: 69 """An S3 Prefix must end with / to avoid uksc/2004/1 matching uksc/2004/1000""" 70 return S3PrefixString(uri + "/")
An S3 Prefix must end with / to avoid uksc/2004/1 matching uksc/2004/1000
73def generate_signed_asset_url( 74 key: str, force_download: bool = False, content_disposition_filename: None | str = None 75) -> str: 76 # If there isn't a PRIVATE_ASSET_BUCKET, don't try to get the bucket. 77 # This helps local environment setup where we don't use S3. 78 if not content_disposition_filename: 79 content_disposition_filename = key.rpartition("/")[-1] 80 bucket = env("PRIVATE_ASSET_BUCKET", None) 81 if not bucket: 82 return "" 83 84 client = create_s3_client() 85 params = {"Bucket": bucket, "Key": key} 86 if force_download: 87 params["ResponseContentDisposition"] = f"attachment;filename={content_disposition_filename}" 88 89 return str( 90 client.generate_presigned_url( 91 "get_object", 92 Params=params, 93 ), 94 )
97def check_docx_exists(uri: DocumentURIString) -> bool: 98 """Does the docx for a document URI actually exist?""" 99 bucket = env("PRIVATE_ASSET_BUCKET", None) 100 s3_key = generate_docx_key(uri) 101 client = create_s3_client() 102 try: 103 client.head_object(Bucket=bucket, Key=s3_key) 104 return True 105 except botocore.exceptions.ClientError as e: 106 if e.response["Error"]["Code"] == "404": 107 return False 108 if e.response["Error"]["Code"] == "403": 109 e.add_note("403 on reading {s3_key}") 110 raise
Does the docx for a document URI actually exist?
113def generate_docx_key(uri: DocumentURIString) -> str: 114 """from a canonical caselaw URI (eat/2022/1) return the S3 key of the associated docx""" 115 return f"{uri}/{uri.replace('/', '_')}.docx"
from a canonical caselaw URI (eat/2022/1) return the S3 key of the associated docx
118def generate_docx_url(uri: DocumentURIString, force_download: bool = True) -> str: 119 """from a canonical caselaw URI (eat/2022/1) return a signed S3 link for the front end""" 120 return generate_signed_asset_url(generate_docx_key(uri), force_download=force_download)
from a canonical caselaw URI (eat/2022/1) return a signed S3 link for the front end
133def delete_some_from_bucket( 134 uri: DocumentURIString, bucket: str, filter: Callable[[ObjectIdentifierTypeDef], bool] 135) -> None: 136 client = create_s3_client() 137 response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri)) 138 139 if response.get("Contents"): 140 objects_to_maybe_delete: list[ObjectIdentifierTypeDef] = [ 141 {"Key": obj["Key"]} for obj in response.get("Contents", []) 142 ] 143 objects_to_delete = [obj for obj in objects_to_maybe_delete if filter(obj)] 144 client.delete_objects( 145 Bucket=bucket, 146 Delete={ 147 "Objects": objects_to_delete, 148 }, 149 )
156def publish_documents(uri: DocumentURIString) -> None: 157 """ 158 Copy assets from the unpublished bucket to the published one. 159 Don't copy parser logs and package tar gz. 160 TODO: consider refactoring with copy_assets 161 """ 162 client = create_s3_client() 163 164 public_bucket = env("PUBLIC_ASSET_BUCKET") 165 private_bucket = env("PRIVATE_ASSET_BUCKET") 166 167 response = client.list_objects(Bucket=private_bucket, Prefix=uri_for_s3(uri)) 168 169 for result in response.get("Contents", []): 170 print(f"Contemplating copying {result!r}") 171 key = str(result["Key"]) 172 173 if not key.endswith("parser.log") and not key.endswith(".tar.gz"): 174 source: CopySourceTypeDef = {"Bucket": private_bucket, "Key": key} 175 extra_args: dict[str, str] = {} 176 try: 177 print(f"Copying {key!r} from {private_bucket!r} to {public_bucket!r}") 178 client.copy(source, public_bucket, key, extra_args) 179 except botocore.client.ClientError as e: 180 logging.warning( 181 f"Unable to copy file {key} to new location {public_bucket}, error: {e}", 182 )
Copy assets from the unpublished bucket to the published one. Don't copy parser logs and package tar gz. TODO: consider refactoring with copy_assets
193def announce_document_event(uri: DocumentURIString, status: str, enrich: bool = False) -> None: 194 client = create_sns_client() 195 196 message_attributes: dict[str, MessageAttributeValueTypeDef] = {} 197 message_attributes["update_type"] = { 198 "DataType": "String", 199 "StringValue": status, 200 } 201 message_attributes["uri_reference"] = { 202 "DataType": "String", 203 "StringValue": uri, 204 } 205 if enrich: 206 message_attributes["trigger_enrichment"] = { 207 "DataType": "String", 208 "StringValue": "1", 209 } 210 211 client.publish( 212 TopicArn=env("SNS_TOPIC"), # this is the ANNOUNCE SNS topic 213 Message=json.dumps({"uri_reference": uri, "status": status}), 214 Subject=f"Updated: {uri} {status}", 215 MessageAttributes=message_attributes, 216 )
219def upload_asset_to_private_bucket(body: bytes, s3_key: str) -> None: 220 """Upload an asset to the private bucket.""" 221 bucket: str = env("PRIVATE_ASSET_BUCKET") 222 s3client = create_s3_client() 223 s3client.put_object(Body=body, Bucket=bucket, Key=s3_key, Tagging="pdfsource=custom-pdfs")
Upload an asset to the private bucket.
226def copy_assets(old_uri: DocumentURIString, new_uri: DocumentURIString) -> None: 227 """ 228 Copy *unpublished* assets from one path to another, 229 renaming DOCX and PDF files as appropriate. 230 """ 231 client = create_s3_client() 232 bucket = env("PRIVATE_ASSET_BUCKET") 233 response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(old_uri)) 234 235 for result in response.get("Contents", []): 236 old_key = str(result["Key"]) 237 new_key = build_new_key(old_key, new_uri) 238 if new_key is None: 239 continue 240 try: 241 source: CopySourceTypeDef = {"Bucket": bucket, "Key": old_key} 242 client.copy(source, bucket, new_key) 243 except botocore.client.ClientError as e: 244 logging.warning( 245 f"Unable to copy file {old_key} to new location {new_key}, error: {e}", 246 )
Copy unpublished assets from one path to another, renaming DOCX and PDF files as appropriate.
249def are_unpublished_assets_clean(uri: DocumentURIString) -> bool: 250 """Returns true if all non-tar.gz assets in the relevant S3 bucket have been cleaned 251 (they have a DOCUMENT_PROCESSOR_VERSION tag) 252 Note: if there are no assets, then this returns true.""" 253 client = create_s3_client() 254 bucket = env("PRIVATE_ASSET_BUCKET") 255 response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri)) 256 for result in response.get("Contents", []): 257 file_key = str(result["Key"]) 258 # ignore original tar.gz files 259 if file_key.endswith(".tar.gz"): 260 continue 261 262 # check if assets are tagged as being processed by S3 263 tag_response = client.get_object_tagging(Bucket=bucket, Key=file_key) 264 if not (any(tag["Key"] == "DOCUMENT_PROCESSOR_VERSION" for tag in tag_response["TagSet"])): 265 return False 266 return True
Returns true if all non-tar.gz assets in the relevant S3 bucket have been cleaned (they have a DOCUMENT_PROCESSOR_VERSION tag) Note: if there are no assets, then this returns true.
269def build_new_key(old_key: str, new_uri: DocumentURIString) -> str: 270 """Ensure that DOCX and PDF filenames are modified to reflect their new home 271 as we get the name of the new S3 path""" 272 old_filename = old_key.rsplit("/", 1)[-1] 273 274 if old_filename.endswith(".docx") or old_filename.endswith(".pdf"): 275 new_filename = new_uri.replace("/", "_") 276 return f"{new_uri}/{new_filename}.{old_filename.split('.')[-1]}" 277 return f"{new_uri}/{old_filename}"
Ensure that DOCX and PDF filenames are modified to reflect their new home as we get the name of the new S3 path
280def request_parse( 281 uri: DocumentURIString, 282 reference: Optional[str], 283 parser_instructions: Optional[ParserInstructionsDict] = None, 284) -> None: 285 client = create_sns_client() 286 287 if parser_instructions is None: 288 parser_instructions = ParserInstructionsDict({}) 289 290 message_to_send = { 291 "properties": { 292 "messageType": "uk.gov.nationalarchives.da.messages.request.courtdocument.parse.RequestCourtDocumentParse", 293 "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z"), 294 "function": "fcl-judgment-parse-request", 295 "producer": "FCL", 296 "executionId": str(uuid.uuid4()), 297 "parentExecutionId": None, 298 }, 299 "parameters": { 300 "s3Bucket": env("PRIVATE_ASSET_BUCKET"), 301 "s3Key": generate_docx_key(uri), 302 "reference": reference or f"FCL-{str(uuid.uuid4())[:-13]}", # uuid truncated at request of TRE 303 "originator": "FCL", 304 "parserInstructions": parser_instructions, 305 }, 306 } 307 308 client.publish( 309 TopicArn=env("REPARSE_SNS_TOPIC"), 310 Message=json.dumps(message_to_send), 311 Subject=f"Reparse request: {uri}", 312 )