caselawclient.models.utilities.aws

  1import datetime
  2import json
  3import logging
  4import uuid
  5from collections.abc import Callable
  6from typing import Any, Literal, Optional, TypedDict, overload
  7
  8import boto3
  9import botocore.client
 10import environ
 11from mypy_boto3_s3.client import S3Client
 12from mypy_boto3_s3.type_defs import CopySourceTypeDef, ObjectIdentifierTypeDef
 13from mypy_boto3_sns.client import SNSClient
 14from mypy_boto3_sns.type_defs import MessageAttributeValueTypeDef
 15from typing_extensions import NotRequired
 16
 17from caselawclient.types import DocumentURIString
 18
 19env = environ.Env()
 20
 21
 22class S3PrefixString(str):
 23    def __new__(cls, content: str) -> "S3PrefixString":
 24        if content[-1] != "/":
 25            raise RuntimeError("S3 Prefixes must end in / so they behave like directories")
 26        return str.__new__(cls, content)
 27
 28
 29class ParserInstructionsMetadataDict(TypedDict):
 30    name: Optional[str]
 31    cite: Optional[str]
 32    court: Optional[str]
 33    date: Optional[str]
 34    uri: Optional[str]
 35
 36
 37class ParserInstructionsDict(TypedDict):
 38    documentType: NotRequired[Optional[str]]
 39    metadata: NotRequired[ParserInstructionsMetadataDict]
 40
 41
 42@overload
 43def create_aws_client(service: Literal["s3"]) -> S3Client: ...
 44
 45
 46@overload
 47def create_aws_client(service: Literal["sns"]) -> SNSClient: ...
 48
 49
 50def create_aws_client(service: Literal["s3", "sns"]) -> Any:
 51    aws = boto3.session.Session()
 52    return aws.client(
 53        service,
 54        region_name=env("PRIVATE_ASSET_BUCKET_REGION", default=None),
 55        config=botocore.client.Config(signature_version="s3v4"),
 56    )
 57
 58
 59def create_s3_client() -> S3Client:
 60    return create_aws_client("s3")
 61
 62
 63def create_sns_client() -> SNSClient:
 64    return create_aws_client("sns")
 65
 66
 67def uri_for_s3(uri: DocumentURIString) -> S3PrefixString:
 68    """An S3 Prefix must end with / to avoid uksc/2004/1 matching uksc/2004/1000"""
 69    return S3PrefixString(uri + "/")
 70
 71
 72def generate_signed_asset_url(
 73    key: str, force_download: bool = False, content_disposition_filename: None | str = None
 74) -> str:
 75    # If there isn't a PRIVATE_ASSET_BUCKET, don't try to get the bucket.
 76    # This helps local environment setup where we don't use S3.
 77    if not content_disposition_filename:
 78        content_disposition_filename = key.rpartition("/")[-1]
 79    bucket = env("PRIVATE_ASSET_BUCKET", None)
 80    if not bucket:
 81        return ""
 82
 83    client = create_s3_client()
 84    params = {"Bucket": bucket, "Key": key}
 85    if force_download:
 86        params["ResponseContentDisposition"] = f"attachment;filename={content_disposition_filename}"
 87
 88    return str(
 89        client.generate_presigned_url(
 90            "get_object",
 91            Params=params,
 92        ),
 93    )
 94
 95
 96def check_docx_exists(uri: DocumentURIString) -> bool:
 97    """Does the docx for a document URI actually exist?"""
 98    bucket = env("PRIVATE_ASSET_BUCKET", None)
 99    s3_key = generate_docx_key(uri)
100    client = create_s3_client()
101    try:
102        client.head_object(Bucket=bucket, Key=s3_key)
103        return True
104    except botocore.exceptions.ClientError as e:
105        if e.response["Error"]["Code"] == "404":
106            return False
107        if e.response["Error"]["Code"] == "403":
108            e.add_note("403 on reading {s3_key}")
109        raise
110
111
112def generate_docx_key(uri: DocumentURIString) -> str:
113    """from a canonical caselaw URI (eat/2022/1) return the S3 key of the associated docx"""
114    return f"{uri}/{uri.replace('/', '_')}.docx"
115
116
117def generate_docx_url(uri: DocumentURIString, force_download: bool = True) -> str:
118    """from a canonical caselaw URI (eat/2022/1) return a signed S3 link for the front end"""
119    return generate_signed_asset_url(generate_docx_key(uri), force_download=force_download)
120
121
122def generate_pdf_url(uri: DocumentURIString, force_download: bool = False) -> str:
123    key = f"{uri}/{uri.replace('/', '_')}.pdf"
124
125    return generate_signed_asset_url(key, force_download=force_download)
126
127
128def delete_from_bucket(uri: DocumentURIString, bucket: str) -> None:
129    delete_some_from_bucket(uri=uri, bucket=bucket, filter=lambda x: True)
130
131
132def delete_some_from_bucket(
133    uri: DocumentURIString, bucket: str, filter: Callable[[ObjectIdentifierTypeDef], bool]
134) -> None:
135    client = create_s3_client()
136    response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri))
137
138    if response.get("Contents"):
139        objects_to_maybe_delete: list[ObjectIdentifierTypeDef] = [
140            {"Key": obj["Key"]} for obj in response.get("Contents", [])
141        ]
142        objects_to_delete = [obj for obj in objects_to_maybe_delete if filter(obj)]
143        client.delete_objects(
144            Bucket=bucket,
145            Delete={
146                "Objects": objects_to_delete,
147            },
148        )
149
150
151def delete_non_targz_from_bucket(uri: DocumentURIString, bucket: str) -> None:
152    delete_some_from_bucket(uri=uri, bucket=bucket, filter=lambda x: not x["Key"].endswith(".tar.gz"))
153
154
155def publish_documents(uri: DocumentURIString) -> None:
156    """
157    Copy assets from the unpublished bucket to the published one.
158    Don't copy parser logs and package tar gz.
159    TODO: consider refactoring with copy_assets
160    """
161    client = create_s3_client()
162
163    public_bucket = env("PUBLIC_ASSET_BUCKET")
164    private_bucket = env("PRIVATE_ASSET_BUCKET")
165
166    response = client.list_objects(Bucket=private_bucket, Prefix=uri_for_s3(uri))
167
168    for result in response.get("Contents", []):
169        print(f"Contemplating copying {result!r}")
170        key = str(result["Key"])
171
172        if not key.endswith("parser.log") and not key.endswith(".tar.gz"):
173            source: CopySourceTypeDef = {"Bucket": private_bucket, "Key": key}
174            extra_args: dict[str, str] = {}
175            try:
176                print(f"Copying {key!r} from {private_bucket!r} to {public_bucket!r}")
177                client.copy(source, public_bucket, key, extra_args)
178            except botocore.client.ClientError as e:
179                logging.warning(
180                    f"Unable to copy file {key} to new location {public_bucket}, error: {e}",
181                )
182
183
184def unpublish_documents(uri: DocumentURIString) -> None:
185    delete_from_bucket(uri, env("PUBLIC_ASSET_BUCKET"))
186
187
188def delete_documents_from_private_bucket(uri: DocumentURIString) -> None:
189    delete_from_bucket(uri, env("PRIVATE_ASSET_BUCKET"))
190
191
192def announce_document_event(uri: DocumentURIString, status: str, enrich: bool = False) -> None:
193    client = create_sns_client()
194
195    message_attributes: dict[str, MessageAttributeValueTypeDef] = {}
196    message_attributes["update_type"] = {
197        "DataType": "String",
198        "StringValue": status,
199    }
200    message_attributes["uri_reference"] = {
201        "DataType": "String",
202        "StringValue": uri,
203    }
204    if enrich:
205        message_attributes["trigger_enrichment"] = {
206            "DataType": "String",
207            "StringValue": "1",
208        }
209
210    client.publish(
211        TopicArn=env("SNS_TOPIC"),  # this is the ANNOUNCE SNS topic
212        Message=json.dumps({"uri_reference": uri, "status": status}),
213        Subject=f"Updated: {uri} {status}",
214        MessageAttributes=message_attributes,
215    )
216
217
218def upload_asset_to_private_bucket(body: bytes, s3_key: str) -> None:
219    """Upload an asset to the private bucket."""
220    bucket: str = env("PRIVATE_ASSET_BUCKET")
221    s3client = create_s3_client()
222    s3client.put_object(Body=body, Bucket=bucket, Key=s3_key, Tagging="pdfsource=custom-pdfs")
223
224
225def copy_assets(old_uri: DocumentURIString, new_uri: DocumentURIString) -> None:
226    """
227    Copy *unpublished* assets from one path to another,
228    renaming DOCX and PDF files as appropriate.
229    """
230    client = create_s3_client()
231    bucket = env("PRIVATE_ASSET_BUCKET")
232    response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(old_uri))
233
234    for result in response.get("Contents", []):
235        old_key = str(result["Key"])
236        new_key = build_new_key(old_key, new_uri)
237        if new_key is None:
238            continue
239        try:
240            source: CopySourceTypeDef = {"Bucket": bucket, "Key": old_key}
241            client.copy(source, bucket, new_key)
242        except botocore.client.ClientError as e:
243            logging.warning(
244                f"Unable to copy file {old_key} to new location {new_key}, error: {e}",
245            )
246
247
248def are_unpublished_assets_clean(uri: DocumentURIString) -> bool:
249    """Returns true if all non-tar.gz assets in the relevant S3 bucket have been cleaned
250    (they have a DOCUMENT_PROCESSOR_VERSION tag)
251    Note: if there are no assets, then this returns true."""
252    client = create_s3_client()
253    bucket = env("PRIVATE_ASSET_BUCKET")
254    response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri))
255    for result in response.get("Contents", []):
256        file_key = str(result["Key"])
257        # ignore original tar.gz files
258        if file_key.endswith(".tar.gz"):
259            continue
260
261        # check if assets are tagged as being processed by S3
262        tag_response = client.get_object_tagging(Bucket=bucket, Key=file_key)
263        if not (any(tag["Key"] == "DOCUMENT_PROCESSOR_VERSION" for tag in tag_response["TagSet"])):
264            return False
265    return True
266
267
268def build_new_key(old_key: str, new_uri: DocumentURIString) -> str:
269    """Ensure that DOCX and PDF filenames are modified to reflect their new home
270    as we get the name of the new S3 path"""
271    old_filename = old_key.rsplit("/", 1)[-1]
272
273    if old_filename.endswith(".docx") or old_filename.endswith(".pdf"):
274        new_filename = new_uri.replace("/", "_")
275        return f"{new_uri}/{new_filename}.{old_filename.split('.')[-1]}"
276    return f"{new_uri}/{old_filename}"
277
278
279def request_parse(
280    uri: DocumentURIString,
281    reference: Optional[str],
282    parser_instructions: Optional[ParserInstructionsDict] = None,
283) -> None:
284    client = create_sns_client()
285
286    if parser_instructions is None:
287        parser_instructions = ParserInstructionsDict({})
288
289    message_to_send = {
290        "properties": {
291            "messageType": "uk.gov.nationalarchives.da.messages.request.courtdocument.parse.RequestCourtDocumentParse",
292            "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z"),
293            "function": "fcl-judgment-parse-request",
294            "producer": "FCL",
295            "executionId": str(uuid.uuid4()),
296            "parentExecutionId": None,
297        },
298        "parameters": {
299            "s3Bucket": env("PRIVATE_ASSET_BUCKET"),
300            "s3Key": generate_docx_key(uri),
301            "reference": reference or f"FCL-{str(uuid.uuid4())[:-13]}",  # uuid truncated at request of TRE
302            "originator": "FCL",
303            "parserInstructions": parser_instructions,
304        },
305    }
306
307    client.publish(
308        TopicArn=env("REPARSE_SNS_TOPIC"),
309        Message=json.dumps(message_to_send),
310        Subject=f"Reparse request: {uri}",
311    )
env = <environ.environ.Env object>
class S3PrefixString(builtins.str):
23class S3PrefixString(str):
24    def __new__(cls, content: str) -> "S3PrefixString":
25        if content[-1] != "/":
26            raise RuntimeError("S3 Prefixes must end in / so they behave like directories")
27        return str.__new__(cls, content)

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

class ParserInstructionsMetadataDict(typing.TypedDict):
30class ParserInstructionsMetadataDict(TypedDict):
31    name: Optional[str]
32    cite: Optional[str]
33    court: Optional[str]
34    date: Optional[str]
35    uri: Optional[str]
name: Optional[str]
cite: Optional[str]
court: Optional[str]
date: Optional[str]
uri: Optional[str]
class ParserInstructionsDict(typing.TypedDict):
38class ParserInstructionsDict(TypedDict):
39    documentType: NotRequired[Optional[str]]
40    metadata: NotRequired[ParserInstructionsMetadataDict]
documentType: NotRequired[Optional[str]]
metadata: NotRequired[ParserInstructionsMetadataDict]
def create_aws_client(service: Literal['s3', 'sns']) -> Any:
51def create_aws_client(service: Literal["s3", "sns"]) -> Any:
52    aws = boto3.session.Session()
53    return aws.client(
54        service,
55        region_name=env("PRIVATE_ASSET_BUCKET_REGION", default=None),
56        config=botocore.client.Config(signature_version="s3v4"),
57    )
def create_s3_client() -> mypy_boto3_s3.client.S3Client:
60def create_s3_client() -> S3Client:
61    return create_aws_client("s3")
def create_sns_client() -> mypy_boto3_sns.client.SNSClient:
64def create_sns_client() -> SNSClient:
65    return create_aws_client("sns")
def uri_for_s3( uri: caselawclient.types.DocumentURIString) -> S3PrefixString:
68def uri_for_s3(uri: DocumentURIString) -> S3PrefixString:
69    """An S3 Prefix must end with / to avoid uksc/2004/1 matching uksc/2004/1000"""
70    return S3PrefixString(uri + "/")

An S3 Prefix must end with / to avoid uksc/2004/1 matching uksc/2004/1000

def generate_signed_asset_url( key: str, force_download: bool = False, content_disposition_filename: None | str = None) -> str:
73def generate_signed_asset_url(
74    key: str, force_download: bool = False, content_disposition_filename: None | str = None
75) -> str:
76    # If there isn't a PRIVATE_ASSET_BUCKET, don't try to get the bucket.
77    # This helps local environment setup where we don't use S3.
78    if not content_disposition_filename:
79        content_disposition_filename = key.rpartition("/")[-1]
80    bucket = env("PRIVATE_ASSET_BUCKET", None)
81    if not bucket:
82        return ""
83
84    client = create_s3_client()
85    params = {"Bucket": bucket, "Key": key}
86    if force_download:
87        params["ResponseContentDisposition"] = f"attachment;filename={content_disposition_filename}"
88
89    return str(
90        client.generate_presigned_url(
91            "get_object",
92            Params=params,
93        ),
94    )
def check_docx_exists(uri: caselawclient.types.DocumentURIString) -> bool:
 97def check_docx_exists(uri: DocumentURIString) -> bool:
 98    """Does the docx for a document URI actually exist?"""
 99    bucket = env("PRIVATE_ASSET_BUCKET", None)
100    s3_key = generate_docx_key(uri)
101    client = create_s3_client()
102    try:
103        client.head_object(Bucket=bucket, Key=s3_key)
104        return True
105    except botocore.exceptions.ClientError as e:
106        if e.response["Error"]["Code"] == "404":
107            return False
108        if e.response["Error"]["Code"] == "403":
109            e.add_note("403 on reading {s3_key}")
110        raise

Does the docx for a document URI actually exist?

def generate_docx_key(uri: caselawclient.types.DocumentURIString) -> str:
113def generate_docx_key(uri: DocumentURIString) -> str:
114    """from a canonical caselaw URI (eat/2022/1) return the S3 key of the associated docx"""
115    return f"{uri}/{uri.replace('/', '_')}.docx"

from a canonical caselaw URI (eat/2022/1) return the S3 key of the associated docx

def generate_docx_url( uri: caselawclient.types.DocumentURIString, force_download: bool = True) -> str:
118def generate_docx_url(uri: DocumentURIString, force_download: bool = True) -> str:
119    """from a canonical caselaw URI (eat/2022/1) return a signed S3 link for the front end"""
120    return generate_signed_asset_url(generate_docx_key(uri), force_download=force_download)

from a canonical caselaw URI (eat/2022/1) return a signed S3 link for the front end

def generate_pdf_url( uri: caselawclient.types.DocumentURIString, force_download: bool = False) -> str:
123def generate_pdf_url(uri: DocumentURIString, force_download: bool = False) -> str:
124    key = f"{uri}/{uri.replace('/', '_')}.pdf"
125
126    return generate_signed_asset_url(key, force_download=force_download)
def delete_from_bucket(uri: caselawclient.types.DocumentURIString, bucket: str) -> None:
129def delete_from_bucket(uri: DocumentURIString, bucket: str) -> None:
130    delete_some_from_bucket(uri=uri, bucket=bucket, filter=lambda x: True)
def delete_some_from_bucket( uri: caselawclient.types.DocumentURIString, bucket: str, filter: Callable[[mypy_boto3_s3.type_defs.ObjectIdentifierTypeDef], bool]) -> None:
133def delete_some_from_bucket(
134    uri: DocumentURIString, bucket: str, filter: Callable[[ObjectIdentifierTypeDef], bool]
135) -> None:
136    client = create_s3_client()
137    response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri))
138
139    if response.get("Contents"):
140        objects_to_maybe_delete: list[ObjectIdentifierTypeDef] = [
141            {"Key": obj["Key"]} for obj in response.get("Contents", [])
142        ]
143        objects_to_delete = [obj for obj in objects_to_maybe_delete if filter(obj)]
144        client.delete_objects(
145            Bucket=bucket,
146            Delete={
147                "Objects": objects_to_delete,
148            },
149        )
def delete_non_targz_from_bucket(uri: caselawclient.types.DocumentURIString, bucket: str) -> None:
152def delete_non_targz_from_bucket(uri: DocumentURIString, bucket: str) -> None:
153    delete_some_from_bucket(uri=uri, bucket=bucket, filter=lambda x: not x["Key"].endswith(".tar.gz"))
def publish_documents(uri: caselawclient.types.DocumentURIString) -> None:
156def publish_documents(uri: DocumentURIString) -> None:
157    """
158    Copy assets from the unpublished bucket to the published one.
159    Don't copy parser logs and package tar gz.
160    TODO: consider refactoring with copy_assets
161    """
162    client = create_s3_client()
163
164    public_bucket = env("PUBLIC_ASSET_BUCKET")
165    private_bucket = env("PRIVATE_ASSET_BUCKET")
166
167    response = client.list_objects(Bucket=private_bucket, Prefix=uri_for_s3(uri))
168
169    for result in response.get("Contents", []):
170        print(f"Contemplating copying {result!r}")
171        key = str(result["Key"])
172
173        if not key.endswith("parser.log") and not key.endswith(".tar.gz"):
174            source: CopySourceTypeDef = {"Bucket": private_bucket, "Key": key}
175            extra_args: dict[str, str] = {}
176            try:
177                print(f"Copying {key!r} from {private_bucket!r} to {public_bucket!r}")
178                client.copy(source, public_bucket, key, extra_args)
179            except botocore.client.ClientError as e:
180                logging.warning(
181                    f"Unable to copy file {key} to new location {public_bucket}, error: {e}",
182                )

Copy assets from the unpublished bucket to the published one. Don't copy parser logs and package tar gz. TODO: consider refactoring with copy_assets

def unpublish_documents(uri: caselawclient.types.DocumentURIString) -> None:
185def unpublish_documents(uri: DocumentURIString) -> None:
186    delete_from_bucket(uri, env("PUBLIC_ASSET_BUCKET"))
def delete_documents_from_private_bucket(uri: caselawclient.types.DocumentURIString) -> None:
189def delete_documents_from_private_bucket(uri: DocumentURIString) -> None:
190    delete_from_bucket(uri, env("PRIVATE_ASSET_BUCKET"))
def announce_document_event( uri: caselawclient.types.DocumentURIString, status: str, enrich: bool = False) -> None:
193def announce_document_event(uri: DocumentURIString, status: str, enrich: bool = False) -> None:
194    client = create_sns_client()
195
196    message_attributes: dict[str, MessageAttributeValueTypeDef] = {}
197    message_attributes["update_type"] = {
198        "DataType": "String",
199        "StringValue": status,
200    }
201    message_attributes["uri_reference"] = {
202        "DataType": "String",
203        "StringValue": uri,
204    }
205    if enrich:
206        message_attributes["trigger_enrichment"] = {
207            "DataType": "String",
208            "StringValue": "1",
209        }
210
211    client.publish(
212        TopicArn=env("SNS_TOPIC"),  # this is the ANNOUNCE SNS topic
213        Message=json.dumps({"uri_reference": uri, "status": status}),
214        Subject=f"Updated: {uri} {status}",
215        MessageAttributes=message_attributes,
216    )
def upload_asset_to_private_bucket(body: bytes, s3_key: str) -> None:
219def upload_asset_to_private_bucket(body: bytes, s3_key: str) -> None:
220    """Upload an asset to the private bucket."""
221    bucket: str = env("PRIVATE_ASSET_BUCKET")
222    s3client = create_s3_client()
223    s3client.put_object(Body=body, Bucket=bucket, Key=s3_key, Tagging="pdfsource=custom-pdfs")

Upload an asset to the private bucket.

def copy_assets( old_uri: caselawclient.types.DocumentURIString, new_uri: caselawclient.types.DocumentURIString) -> None:
226def copy_assets(old_uri: DocumentURIString, new_uri: DocumentURIString) -> None:
227    """
228    Copy *unpublished* assets from one path to another,
229    renaming DOCX and PDF files as appropriate.
230    """
231    client = create_s3_client()
232    bucket = env("PRIVATE_ASSET_BUCKET")
233    response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(old_uri))
234
235    for result in response.get("Contents", []):
236        old_key = str(result["Key"])
237        new_key = build_new_key(old_key, new_uri)
238        if new_key is None:
239            continue
240        try:
241            source: CopySourceTypeDef = {"Bucket": bucket, "Key": old_key}
242            client.copy(source, bucket, new_key)
243        except botocore.client.ClientError as e:
244            logging.warning(
245                f"Unable to copy file {old_key} to new location {new_key}, error: {e}",
246            )

Copy unpublished assets from one path to another, renaming DOCX and PDF files as appropriate.

def are_unpublished_assets_clean(uri: caselawclient.types.DocumentURIString) -> bool:
249def are_unpublished_assets_clean(uri: DocumentURIString) -> bool:
250    """Returns true if all non-tar.gz assets in the relevant S3 bucket have been cleaned
251    (they have a DOCUMENT_PROCESSOR_VERSION tag)
252    Note: if there are no assets, then this returns true."""
253    client = create_s3_client()
254    bucket = env("PRIVATE_ASSET_BUCKET")
255    response = client.list_objects(Bucket=bucket, Prefix=uri_for_s3(uri))
256    for result in response.get("Contents", []):
257        file_key = str(result["Key"])
258        # ignore original tar.gz files
259        if file_key.endswith(".tar.gz"):
260            continue
261
262        # check if assets are tagged as being processed by S3
263        tag_response = client.get_object_tagging(Bucket=bucket, Key=file_key)
264        if not (any(tag["Key"] == "DOCUMENT_PROCESSOR_VERSION" for tag in tag_response["TagSet"])):
265            return False
266    return True

Returns true if all non-tar.gz assets in the relevant S3 bucket have been cleaned (they have a DOCUMENT_PROCESSOR_VERSION tag) Note: if there are no assets, then this returns true.

def build_new_key(old_key: str, new_uri: caselawclient.types.DocumentURIString) -> str:
269def build_new_key(old_key: str, new_uri: DocumentURIString) -> str:
270    """Ensure that DOCX and PDF filenames are modified to reflect their new home
271    as we get the name of the new S3 path"""
272    old_filename = old_key.rsplit("/", 1)[-1]
273
274    if old_filename.endswith(".docx") or old_filename.endswith(".pdf"):
275        new_filename = new_uri.replace("/", "_")
276        return f"{new_uri}/{new_filename}.{old_filename.split('.')[-1]}"
277    return f"{new_uri}/{old_filename}"

Ensure that DOCX and PDF filenames are modified to reflect their new home as we get the name of the new S3 path

def request_parse( uri: caselawclient.types.DocumentURIString, reference: Optional[str], parser_instructions: Optional[ParserInstructionsDict] = None) -> None:
280def request_parse(
281    uri: DocumentURIString,
282    reference: Optional[str],
283    parser_instructions: Optional[ParserInstructionsDict] = None,
284) -> None:
285    client = create_sns_client()
286
287    if parser_instructions is None:
288        parser_instructions = ParserInstructionsDict({})
289
290    message_to_send = {
291        "properties": {
292            "messageType": "uk.gov.nationalarchives.da.messages.request.courtdocument.parse.RequestCourtDocumentParse",
293            "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z"),
294            "function": "fcl-judgment-parse-request",
295            "producer": "FCL",
296            "executionId": str(uuid.uuid4()),
297            "parentExecutionId": None,
298        },
299        "parameters": {
300            "s3Bucket": env("PRIVATE_ASSET_BUCKET"),
301            "s3Key": generate_docx_key(uri),
302            "reference": reference or f"FCL-{str(uuid.uuid4())[:-13]}",  # uuid truncated at request of TRE
303            "originator": "FCL",
304            "parserInstructions": parser_instructions,
305        },
306    }
307
308    client.publish(
309        TopicArn=env("REPARSE_SNS_TOPIC"),
310        Message=json.dumps(message_to_send),
311        Subject=f"Reparse request: {uri}",
312    )