caselawclient.Client
1import importlib.metadata 2import json 3import logging 4import os 5import re 6import warnings 7from datetime import datetime, time, timedelta 8from pathlib import Path 9from typing import Any, Optional, Type, Union 10from xml.etree.ElementTree import Element 11 12import environ 13import requests 14from dateutil.parser import isoparse 15from defusedxml import ElementTree 16from defusedxml.ElementTree import ParseError, fromstring 17from ds_caselaw_utils.types import NeutralCitationString 18from lxml import etree 19from requests.auth import HTTPBasicAuth 20from requests.structures import CaseInsensitiveDict 21from requests_toolbelt.multipart import decoder 22 23from caselawclient import xquery_type_dicts as query_dicts 24from caselawclient.identifier_resolution import IdentifierResolutions 25from caselawclient.models.documents import ( 26 DOCUMENT_COLLECTION_URI_JUDGMENT, 27 DOCUMENT_COLLECTION_URI_PRESS_SUMMARY, 28 Document, 29) 30from caselawclient.models.documents.versions import VersionAnnotation 31from caselawclient.models.judgments import Judgment 32from caselawclient.models.press_summaries import PressSummary 33from caselawclient.models.utilities import move 34from caselawclient.search_parameters import SearchParameters 35from caselawclient.types import DocumentIdentifierSlug, DocumentIdentifierValue, DocumentLock, DocumentURIString 36from caselawclient.xquery_type_dicts import ( 37 CheckContentHashUniqueByUriDict, 38 MarkLogicDocumentURIString, 39 MarkLogicDocumentVersionURIString, 40 MarkLogicPrivilegeURIString, 41) 42 43from .content_hash import validate_content_hash 44from .errors import ( 45 DocumentNotFoundError, 46 GatewayTimeoutError, 47 MarklogicAPIError, 48 MarklogicBadRequestError, 49 MarklogicCheckoutConflictError, 50 MarklogicCommunicationError, 51 MarklogicNotPermittedError, 52 MarklogicResourceLockedError, 53 MarklogicResourceNotCheckedOutError, 54 MarklogicResourceNotFoundError, 55 MarklogicResourceUnmanagedError, 56 MarklogicUnauthorizedError, 57 MarklogicValidationFailedError, 58) 59 60env = environ.Env() 61 62# Requests timeouts: https://requests.readthedocs.io/en/latest/user/advanced/ 63CONNECT_TIMEOUT = float(os.environ.get("CONNECT_TIMEOUT", "3.05")) 64READ_TIMEOUT = float(os.environ.get("READ_TIMEOUT", "10.0")) 65 66ROOT_DIR = os.path.dirname(os.path.realpath(__file__)) 67DEFAULT_XSL_TRANSFORM = "accessible-html.xsl" 68 69try: 70 VERSION = importlib.metadata.version("ds-caselaw-marklogic-api-client") 71except importlib.metadata.PackageNotFoundError: 72 VERSION = "0" 73DEFAULT_USER_AGENT = f"ds-caselaw-marklogic-api-client/{VERSION}" 74 75DEBUG: bool = bool(os.getenv("DEBUG", default=False)) 76 77 78class NoResponse(Exception): 79 """A requests HTTPError has no response. We expect this will never happen.""" 80 81 82class MultipartResponseLongerThanExpected(Exception): 83 """ 84 MarkLogic has returned a multipart response with more than one part, where only a single part was expected. 85 """ 86 87 88def get_multipart_strings_from_marklogic_response( 89 response: requests.Response, 90) -> list[str]: 91 """ 92 Given a HTTP response from a MarkLogic server, extract the text content from each part of the response. 93 94 :param response: A multipart HTTP response 95 96 :return: A list of the text within each part of the response 97 """ 98 if not (response.content): 99 return [] 100 101 multipart_data = decoder.MultipartDecoder.from_response(response) 102 103 return [part.text for part in multipart_data.parts] 104 105 106def get_multipart_bytes_from_marklogic_response( 107 response: requests.Response, 108) -> list[bytes]: 109 if not (response.content): 110 return [] 111 112 multipart_data = decoder.MultipartDecoder.from_response(response) 113 114 return [part.content for part in multipart_data.parts] 115 116 117def get_single_string_from_marklogic_response( 118 response: requests.Response, 119) -> str: 120 """ 121 Given a HTTP response from a MarkLogic server, assuming the response contains a single part, extract the text 122 content of the response. 123 124 :param response: A multipart HTTP response 125 126 :return: The text of the response 127 128 :raises MultipartResponseLongerThanExpected: If the response from MarkLogic has more than one part 129 """ 130 parts = get_multipart_strings_from_marklogic_response(response) 131 part_count = len(parts) 132 133 if part_count == 0: 134 # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which 135 # relies on "" being falsy. 136 return "" 137 138 if part_count > 1: 139 raise MultipartResponseLongerThanExpected( 140 f"Response returned {part_count} multipart items, expected 1", 141 ) 142 143 return parts[0] 144 145 146def get_single_bytestring_from_marklogic_response( 147 response: requests.Response, 148) -> bytes: 149 parts = get_multipart_bytes_from_marklogic_response(response) 150 part_count = len(parts) 151 152 if part_count == 0: 153 # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which 154 # relies on "" being falsy. 155 return b"" 156 157 if part_count > 1: 158 raise MultipartResponseLongerThanExpected( 159 f"Response returned {part_count} multipart items, expected 1", 160 ) 161 162 return parts[0] 163 164 165class MarklogicApiClient: 166 """ 167 The base class for interacting with a MarkLogic instance. 168 """ 169 170 http_error_classes: dict[int, Type[MarklogicAPIError]] = { 171 400: MarklogicBadRequestError, 172 401: MarklogicUnauthorizedError, 173 403: MarklogicNotPermittedError, 174 404: MarklogicResourceNotFoundError, 175 504: GatewayTimeoutError, 176 } 177 error_code_classes: dict[str, Type[MarklogicAPIError]] = { 178 "XDMP-DOCNOTFOUND": MarklogicResourceNotFoundError, 179 "XDMP-LOCKCONFLICT": MarklogicResourceLockedError, 180 "XDMP-LOCKED": MarklogicResourceLockedError, 181 "DLS-UNMANAGED": MarklogicResourceUnmanagedError, 182 "DLS-NOTCHECKEDOUT": MarklogicResourceNotCheckedOutError, 183 "DLS-CHECKOUTCONFLICT": MarklogicCheckoutConflictError, 184 "SEC-PRIVDNE": MarklogicNotPermittedError, 185 "XDMP-VALIDATE.*": MarklogicValidationFailedError, 186 "FCL-DOCUMENTNOTFOUND.*": DocumentNotFoundError, 187 } 188 189 default_http_error_class = MarklogicCommunicationError 190 191 def __init__( 192 self, 193 host: str, 194 username: str, 195 password: str, 196 use_https: bool, 197 user_agent: str = DEFAULT_USER_AGENT, 198 ) -> None: 199 self.host = host 200 self.username = username 201 self.password = password 202 self.base_url = f"{'https' if use_https else 'http'}://{self.host}:8011" 203 # Apply auth / common headers to the session 204 self.session = requests.Session() 205 self.session.auth = HTTPBasicAuth(username, password) 206 self.session.headers.update({"User-Agent": user_agent}) 207 self.user_agent = user_agent 208 209 def get_press_summaries_for_document_uri( 210 self, 211 uri: DocumentURIString, 212 ) -> list[PressSummary]: 213 """ 214 Returns a list of PressSummary objects associated with a given Document URI 215 """ 216 vars: query_dicts.GetComponentsForDocumentDict = { 217 "parent_uri": uri, 218 "component": "pressSummary", 219 } 220 response = self._send_to_eval(vars, "get_components_for_document.xqy") 221 uris = get_multipart_strings_from_marklogic_response(response) 222 return [ 223 PressSummary(DocumentURIString(uri.strip("/").strip(".xml")), self) for uri in uris 224 ] # TODO: Migrate this strip behaviour into proper manipulation of a MarkLogicURIString 225 226 def get_document_by_uri( 227 self, 228 uri: DocumentURIString, 229 search_query: Optional[str] = None, 230 ) -> Document: 231 document_type_class = self.get_document_type_from_uri(uri) 232 return document_type_class(uri, self, search_query=search_query) 233 234 def get_document_type_from_uri(self, uri: DocumentURIString) -> Type[Document]: 235 vars: query_dicts.DocumentCollectionsDict = { 236 "uri": self._format_uri_for_marklogic(uri), 237 } 238 response = self._send_to_eval(vars, "document_collections.xqy") 239 collections = get_multipart_strings_from_marklogic_response(response) 240 241 if DOCUMENT_COLLECTION_URI_JUDGMENT in collections: 242 return Judgment 243 if DOCUMENT_COLLECTION_URI_PRESS_SUMMARY in collections: 244 return PressSummary 245 return Document 246 247 def _get_error_code_class(self, error_code: str) -> Type[MarklogicAPIError]: 248 """ 249 Get the exception type for a MarkLogic error code, or the first part of one 250 """ 251 for regex, error in self.error_code_classes.items(): 252 if re.fullmatch(regex, error_code): 253 return error 254 print(f"No error code match found for {error_code}") 255 return self.default_http_error_class 256 257 def _path_to_request_url(self, path: str) -> str: 258 return f"{self.base_url}/{path.lstrip('/')}" 259 260 @classmethod 261 def _get_error_code(cls, content_as_xml: Optional[str]) -> str: 262 logging.warning( 263 "XMLTools is deprecated and will be removed in later versions. " 264 "Use methods from MarklogicApiClient.Client instead.", 265 ) 266 if not content_as_xml: 267 return "Unknown error, Marklogic returned a null or empty response" 268 try: 269 xml = fromstring(content_as_xml) 270 return str( 271 xml.find( 272 "message-code", 273 namespaces={"": "http://marklogic.com/xdmp/error"}, 274 ).text 275 ) 276 except (ParseError, TypeError, AttributeError): 277 return "Unknown error, Marklogic returned a null or empty response" 278 279 def _raise_for_status(self, response: requests.Response) -> None: 280 try: 281 response.raise_for_status() 282 except requests.exceptions.HTTPError as e: 283 if e.response is None: 284 raise NoResponse 285 status_code = e.response.status_code 286 new_error_class = self.http_error_classes.get( 287 status_code, 288 self.default_http_error_class, 289 ) 290 try: 291 response_body = json.dumps(response.json(), indent=4) 292 except requests.JSONDecodeError: 293 response_body = response.text 294 295 if new_error_class == self.default_http_error_class: 296 # Attempt to decode the error code from the response 297 298 error_code = self._get_error_code(response.content.decode("utf-8")) 299 300 new_error_class = self._get_error_code_class(error_code) 301 302 new_exception = new_error_class( 303 f"{e}. Response body:\n{response_body}", 304 ) 305 new_exception.response = response 306 raise new_exception 307 308 def _format_uri_for_marklogic( 309 self, 310 uri: DocumentURIString, 311 ) -> MarkLogicDocumentURIString: 312 """ 313 MarkLogic requires a document URI that begins with a slash `/` and ends in `.xml`. This method ensures any takes 314 a `DocumentURIString` and converts it to a MarkLogic-friendly `MarkLogicDocumentURIString`. 315 316 :return: A `MarkLogicDocumentURIString` at which the document at the given `DocumentURIString` can be located 317 within MarkLogic. 318 """ 319 return MarkLogicDocumentURIString(f"/{uri.lstrip('/').rstrip('/')}.xml") 320 321 def _xquery_path(self, xquery_file_name: str) -> str: 322 return os.path.join(ROOT_DIR, "xquery", xquery_file_name) 323 324 def _send_to_eval( 325 self, 326 vars: query_dicts.MarkLogicAPIDict, 327 xquery_file_name: str, 328 timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT), 329 ) -> requests.Response: 330 return self.eval( 331 self._xquery_path(xquery_file_name), 332 vars=json.dumps(vars), 333 accept_header="application/xml", 334 timeout=timeout, 335 ) 336 337 def _eval_and_decode( 338 self, 339 vars: query_dicts.MarkLogicAPIDict, 340 xquery_file_name: str, 341 ) -> str: 342 response = self._send_to_eval(vars, xquery_file_name) 343 return get_single_string_from_marklogic_response(response) 344 345 def _eval_as_bytes( 346 self, 347 vars: query_dicts.MarkLogicAPIDict, 348 xquery_file_name: str, 349 ) -> bytes: 350 response = self._send_to_eval(vars, xquery_file_name) 351 return get_single_bytestring_from_marklogic_response(response) 352 353 def prepare_request_kwargs( 354 self, 355 method: str, 356 path: str, 357 body: Optional[str] = None, 358 data: Optional[dict[str, Any]] = None, 359 ) -> dict[str, Any]: 360 kwargs = dict(url=self._path_to_request_url(path)) 361 if data is not None: 362 data = {k: v for k, v in data.items() if v is not None} 363 if method == "GET": 364 kwargs["params"] = data # type: ignore 365 else: 366 kwargs["data"] = json.dumps(data) 367 if body is not None: 368 kwargs["data"] = body 369 return kwargs 370 371 def make_request( 372 self, 373 method: str, 374 path: str, 375 headers: CaseInsensitiveDict[Union[str, Any]], 376 body: Optional[str] = None, 377 data: Optional[dict[str, Any]] = None, 378 ) -> requests.Response: 379 kwargs = self.prepare_request_kwargs(method, path, body, data) 380 self.session.headers = headers 381 response = self.session.request(method, **kwargs) 382 # Raise relevant exception for an erroneous response 383 self._raise_for_status(response) 384 return response 385 386 def GET(self, path: str, headers: dict[str, Any], **data: Any) -> requests.Response: 387 logging.warning("GET() is deprecated, use eval() or invoke()") 388 return self.make_request("GET", path, headers, data) # type: ignore 389 390 def POST( 391 self, 392 path: str, 393 headers: dict[str, Any], 394 **data: Any, 395 ) -> requests.Response: 396 logging.warning("POST() is deprecated, use eval() or invoke()") 397 return self.make_request("POST", path, headers, data) # type: ignore 398 399 def document_exists(self, document_uri: DocumentURIString) -> bool: 400 uri = self._format_uri_for_marklogic(document_uri) 401 vars: query_dicts.DocumentExistsDict = { 402 "uri": uri, 403 } 404 decoded_response = self._eval_and_decode(vars, "document_exists.xqy") 405 406 if decoded_response == "true": 407 return True 408 if decoded_response == "false": 409 return False 410 raise RuntimeError("Marklogic response was neither true nor false") 411 412 def get_judgment_xml_bytestring( 413 self, 414 judgment_uri: DocumentURIString, 415 version_uri: Optional[DocumentURIString] = None, 416 show_unpublished: bool = False, 417 search_query: Optional[str] = None, 418 ) -> bytes: 419 marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri) 420 marklogic_document_version_uri = ( 421 MarkLogicDocumentVersionURIString( 422 self._format_uri_for_marklogic(version_uri), 423 ) 424 if version_uri 425 else None 426 ) 427 show_unpublished = self.verify_show_unpublished(show_unpublished) 428 429 vars: query_dicts.GetJudgmentDict = { 430 "uri": marklogic_document_uri, 431 "version_uri": marklogic_document_version_uri, 432 "show_unpublished": show_unpublished, 433 "search_query": search_query, 434 } 435 436 response = self._eval_as_bytes(vars, "get_judgment.xqy") 437 if not response: 438 raise MarklogicNotPermittedError( 439 "The document is not published and show_unpublished was not set", 440 ) 441 442 return response 443 444 def get_judgment_xml( 445 self, 446 judgment_uri: DocumentURIString, 447 version_uri: Optional[DocumentURIString] = None, 448 show_unpublished: bool = False, 449 search_query: Optional[str] = None, 450 ) -> str: 451 return self.get_judgment_xml_bytestring( 452 judgment_uri, 453 version_uri, 454 show_unpublished, 455 search_query=search_query, 456 ).decode(encoding="utf-8") 457 458 def set_document_name( 459 self, 460 document_uri: DocumentURIString, 461 content: str, 462 ) -> requests.Response: 463 uri = self._format_uri_for_marklogic(document_uri) 464 vars: query_dicts.SetMetadataNameDict = {"uri": uri, "content": content} 465 return self._send_to_eval(vars, "set_metadata_name.xqy") 466 467 def set_judgment_date( 468 self, 469 judgment_uri: DocumentURIString, 470 content: str, 471 ) -> requests.Response: 472 warnings.warn( 473 "set_judgment_date() is deprecated, use set_document_work_expression_date()", 474 DeprecationWarning, 475 stacklevel=2, 476 ) 477 return self.set_document_work_expression_date(judgment_uri, content) 478 479 def set_document_work_expression_date( 480 self, 481 document_uri: DocumentURIString, 482 content: str, 483 ) -> requests.Response: 484 uri = self._format_uri_for_marklogic(document_uri) 485 vars: query_dicts.SetMetadataWorkExpressionDateDict = { 486 "uri": uri, 487 "content": content, 488 } 489 490 return self._send_to_eval(vars, "set_metadata_work_expression_date.xqy") 491 492 def set_judgment_citation( 493 self, 494 judgment_uri: DocumentURIString, 495 content: str, 496 ) -> requests.Response: 497 uri = self._format_uri_for_marklogic(judgment_uri) 498 vars: query_dicts.SetMetadataCitationDict = { 499 "uri": uri, 500 "content": content.strip(), 501 } 502 503 return self._send_to_eval(vars, "set_metadata_citation.xqy") 504 505 def set_document_court( 506 self, 507 document_uri: DocumentURIString, 508 content: str, 509 ) -> requests.Response: 510 uri = self._format_uri_for_marklogic(document_uri) 511 vars: query_dicts.SetMetadataCourtDict = {"uri": uri, "content": content} 512 513 return self._send_to_eval(vars, "set_metadata_court.xqy") 514 515 def set_document_jurisdiction( 516 self, 517 document_uri: DocumentURIString, 518 content: str, 519 ) -> requests.Response: 520 uri = self._format_uri_for_marklogic(document_uri) 521 vars: query_dicts.SetMetadataJurisdictionDict = {"uri": uri, "content": content} 522 return self._send_to_eval(vars, "set_metadata_jurisdiction.xqy") 523 524 def set_document_court_and_jurisdiction( 525 self, 526 document_uri: DocumentURIString, 527 content: str, 528 ) -> requests.Response: 529 if "/" in content: 530 court, jurisdiction = re.split("\\s*/\\s*", content) 531 self.set_document_court(document_uri, court) 532 return self.set_document_jurisdiction(document_uri, jurisdiction) 533 self.set_document_court(document_uri, content) 534 return self.set_document_jurisdiction(document_uri, "") 535 536 def set_judgment_this_uri( 537 self, 538 judgment_uri: DocumentURIString, 539 ) -> requests.Response: 540 uri = self._format_uri_for_marklogic(judgment_uri) 541 content_with_id = f"https://caselaw.nationalarchives.gov.uk/id/{judgment_uri.lstrip('/')}" 542 content_without_id = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}" 543 content_with_xml = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}/data.xml" 544 vars: query_dicts.SetMetadataThisUriDict = { 545 "uri": uri, 546 "content_with_id": content_with_id, 547 "content_without_id": content_without_id, 548 "content_with_xml": content_with_xml, 549 } 550 551 return self._send_to_eval(vars, "set_metadata_this_uri.xqy") 552 553 def save_locked_judgment_xml( 554 self, 555 judgment_uri: DocumentURIString, 556 judgment_xml: bytes, 557 annotation: VersionAnnotation, 558 ) -> requests.Response: 559 """assumes the judgment is already locked, does not unlock/check in 560 note this version assumes the XML is raw bytes, rather than a tree...""" 561 562 validate_content_hash(judgment_xml) 563 uri = self._format_uri_for_marklogic(judgment_uri) 564 565 annotation.set_calling_function("save_locked_judgment_xml") 566 annotation.set_calling_agent(self.user_agent) 567 568 vars: query_dicts.UpdateLockedJudgmentDict = { 569 "uri": uri, 570 "judgment": judgment_xml.decode("utf-8"), 571 "annotation": annotation.as_json, 572 } 573 574 return self._send_to_eval(vars, "update_locked_judgment.xqy") 575 576 def insert_document_xml( 577 self, 578 document_uri: DocumentURIString, 579 document_xml: Element, 580 document_type: type[Document], 581 annotation: VersionAnnotation, 582 ) -> requests.Response: 583 """ 584 Insert a new XML document into MarkLogic. 585 586 :param document_uri: The URI to insert the document at 587 :param document_xml: The XML of the document to insert 588 :param document_type: The type class of the document 589 :param annotation: Annotations to record alongside this version 590 591 :return: The response object from MarkLogic 592 """ 593 xml = ElementTree.tostring(document_xml) 594 595 uri = self._format_uri_for_marklogic(document_uri) 596 597 annotation.set_calling_function("insert_document_xml") 598 annotation.set_calling_agent(self.user_agent) 599 600 vars: query_dicts.InsertDocumentDict = { 601 "uri": uri, 602 "type_collection": document_type.type_collection_name, 603 "document": xml.decode("utf-8"), 604 "annotation": annotation.as_json, 605 } 606 607 return self._send_to_eval(vars, "insert_document.xqy") 608 609 def update_document_xml( 610 self, 611 document_uri: DocumentURIString, 612 document_xml: Element, 613 annotation: VersionAnnotation, 614 ) -> requests.Response: 615 """ 616 Updates an existing XML document in MarkLogic with a new version. 617 618 This uses `dls:document-checkout-update-checkin` to perform this in a single operation. 619 620 :param document_uri: The URI of the document to update 621 :param document_xml: The new XML content of the document 622 :param annotation: Annotations to record alongside this version 623 624 :return: The response object from MarkLogic 625 """ 626 xml = ElementTree.tostring(document_xml) 627 628 uri = self._format_uri_for_marklogic(document_uri) 629 630 annotation.set_calling_function("update_document_xml") 631 annotation.set_calling_agent(self.user_agent) 632 633 vars: query_dicts.UpdateDocumentDict = { 634 "uri": uri, 635 "judgment": xml.decode("utf-8"), 636 "annotation": annotation.as_json, 637 } 638 639 return self._send_to_eval(vars, "update_document.xqy") 640 641 def list_judgment_versions( 642 self, 643 judgment_uri: DocumentURIString, 644 ) -> requests.Response: 645 uri = self._format_uri_for_marklogic(judgment_uri) 646 vars: query_dicts.ListJudgmentVersionsDict = {"uri": uri} 647 648 return self._send_to_eval(vars, "list_judgment_versions.xqy") 649 650 def checkout_judgment( 651 self, 652 judgment_uri: DocumentURIString, 653 annotation: str = "", 654 expires_at_midnight: bool = False, 655 timeout_seconds: int = -1, 656 ) -> requests.Response: 657 """If timeout_seconds is -1, the lock never times out""" 658 uri = self._format_uri_for_marklogic(judgment_uri) 659 vars: query_dicts.CheckoutJudgmentDict = { 660 "uri": uri, 661 "annotation": annotation, 662 "timeout": timeout_seconds, 663 } 664 665 if expires_at_midnight: 666 timeout = self.calculate_seconds_until_midnight() 667 vars["timeout"] = timeout 668 669 return self._send_to_eval(vars, "checkout_judgment.xqy") 670 671 def checkin_judgment(self, judgment_uri: DocumentURIString) -> requests.Response: 672 uri = self._format_uri_for_marklogic(judgment_uri) 673 vars: query_dicts.CheckinJudgmentDict = {"uri": uri} 674 675 return self._send_to_eval(vars, "checkin_judgment.xqy") 676 677 def get_judgment_checkout_status( 678 self, 679 judgment_uri: DocumentURIString, 680 ) -> requests.Response: 681 uri = self._format_uri_for_marklogic(judgment_uri) 682 vars: query_dicts.GetJudgmentCheckoutStatusDict = {"uri": uri} 683 684 return self._send_to_eval(vars, "get_judgment_checkout_status.xqy") 685 686 def get_judgment_checkout_status_message( 687 self, 688 judgment_uri: DocumentURIString, 689 ) -> Optional[str]: 690 """Return the annotation of the lock or `None` if there is no lock.""" 691 response = self.get_judgment_checkout_status(judgment_uri) 692 if not response.content: 693 return None 694 content = decoder.MultipartDecoder.from_response(response).parts[0].text 695 if content == "": 696 return None 697 response_xml = ElementTree.fromstring(content) 698 return str( 699 response_xml.find( 700 "dls:annotation", 701 namespaces={"dls": "http://marklogic.com/xdmp/dls"}, 702 ).text 703 ) 704 705 def get_judgment_version( 706 self, 707 judgment_uri: DocumentURIString, 708 version: int, 709 ) -> requests.Response: 710 uri = self._format_uri_for_marklogic(judgment_uri) 711 vars: query_dicts.GetJudgmentVersionDict = {"uri": uri, "version": str(version)} 712 713 return self._send_to_eval(vars, "get_judgment_version.xqy") 714 715 def validate_document(self, document_uri: DocumentURIString) -> bool: 716 vars: query_dicts.ValidateDocumentDict = { 717 "uri": self._format_uri_for_marklogic(document_uri), 718 } 719 response = self._send_to_eval(vars, "validate_document.xqy") 720 content = decoder.MultipartDecoder.from_response(response).parts[0].text 721 xml = ElementTree.fromstring(content) 722 return ( 723 len( 724 xml.findall( 725 ".//error:error", 726 {"error": "http://marklogic.com/xdmp/error"}, 727 ), 728 ) 729 == 0 730 ) 731 732 def has_unique_content_hash(self, judgment_uri: DocumentURIString) -> bool: 733 """ 734 Returns True if the content hash for this document is unique (not shared with other documents). 735 """ 736 uri = self._format_uri_for_marklogic(judgment_uri) 737 vars: CheckContentHashUniqueByUriDict = {"uri": uri} 738 return self._eval_and_decode(vars, "check_content_hash_unique_by_uri.xqy") == "true" 739 740 def eval( 741 self, 742 xquery_path: str, 743 vars: str, 744 accept_header: str = "multipart/mixed", 745 timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT), 746 ) -> requests.Response: 747 headers = { 748 "Content-type": "application/x-www-form-urlencoded", 749 "Accept": accept_header, 750 } 751 data = { 752 "xquery": Path(xquery_path).read_text(), 753 "vars": vars, 754 } 755 path = "LATEST/eval" 756 757 if DEBUG: 758 print(f"Sending {vars} to {xquery_path}") 759 760 response = self.session.request( 761 "POST", 762 url=self._path_to_request_url(path), 763 headers=headers, 764 data=data, 765 timeout=timeout, 766 ) 767 # Raise relevant exception for an erroneous response 768 self._raise_for_status(response) 769 return response 770 771 def invoke( 772 self, 773 module: str, 774 vars: str, 775 accept_header: str = "multipart/mixed", 776 ) -> requests.Response: 777 headers = { 778 "Content-type": "application/x-www-form-urlencoded", 779 "Accept": accept_header, 780 } 781 data = { 782 "module": module, 783 "vars": vars, 784 } 785 path = "LATEST/invoke" 786 response = self.session.request( 787 "POST", 788 url=self._path_to_request_url(path), 789 headers=headers, 790 data=data, 791 ) 792 # Raise relevant exception for an erroneous response 793 self._raise_for_status(response) 794 return response 795 796 def advanced_search(self, search_parameters: SearchParameters) -> requests.Response: 797 """ 798 Performs a search on the entire document set. 799 800 :param query: 801 :param court: 802 :param judge: 803 :param party: 804 :param neutral_citation: 805 :param document_name: 806 :param consignment_number: 807 :param specific_keyword: 808 :param order: 809 :param date_from: 810 :param date_to: 811 :param page: 812 :param page_size: 813 :param show_unpublished: If True, both published and unpublished documents will be returned 814 :param only_unpublished: If True, will only return published documents. Ignores the value of show_unpublished 815 :param collections: 816 :return: 817 """ 818 module = "/judgments/search/search-v2.xqy" # as stored on Marklogic 819 search_parameters.show_unpublished = self.verify_show_unpublished( 820 search_parameters.show_unpublished, 821 ) 822 vars = json.dumps(search_parameters.as_marklogic_payload()) 823 return self.invoke(module, vars) 824 825 def eval_xslt( 826 self, 827 judgment_uri: DocumentURIString, 828 version_uri: Optional[DocumentURIString] = None, 829 show_unpublished: bool = False, 830 xsl_filename: str = DEFAULT_XSL_TRANSFORM, 831 query: Optional[str] = None, 832 ) -> requests.Response: 833 marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri) 834 marklogic_document_version_uri = ( 835 MarkLogicDocumentVersionURIString( 836 self._format_uri_for_marklogic(version_uri), 837 ) 838 if version_uri 839 else None 840 ) 841 842 image_location = os.getenv("XSLT_IMAGE_LOCATION", "") 843 844 show_unpublished = self.verify_show_unpublished(show_unpublished) 845 846 vars: query_dicts.XsltTransformDict = { 847 "uri": marklogic_document_uri, 848 "version_uri": marklogic_document_version_uri, 849 "show_unpublished": show_unpublished, 850 "img_location": image_location, 851 "xsl_filename": xsl_filename, 852 "query": query, 853 } 854 855 return self._send_to_eval(vars, "xslt_transform.xqy") 856 857 def accessible_judgment_transformation( 858 self, 859 judgment_uri: DocumentURIString, 860 version_uri: Optional[DocumentURIString] = None, 861 show_unpublished: bool = False, 862 ) -> requests.Response: 863 return self.eval_xslt( 864 judgment_uri, 865 version_uri, 866 show_unpublished, 867 xsl_filename=DEFAULT_XSL_TRANSFORM, 868 ) 869 870 def original_judgment_transformation( 871 self, 872 judgment_uri: DocumentURIString, 873 version_uri: Optional[DocumentURIString] = None, 874 show_unpublished: bool = False, 875 ) -> requests.Response: 876 return self.eval_xslt( 877 judgment_uri, 878 version_uri, 879 show_unpublished, 880 xsl_filename="as-handed-down.xsl", 881 ) 882 883 def get_property(self, judgment_uri: DocumentURIString, name: str) -> str: 884 uri = self._format_uri_for_marklogic(judgment_uri) 885 vars: query_dicts.GetPropertyDict = { 886 "uri": uri, 887 "name": name, 888 } 889 return self._eval_and_decode(vars, "get_property.xqy") 890 891 def get_property_as_node(self, judgment_uri: DocumentURIString, name: str) -> Optional[etree._Element]: 892 uri = self._format_uri_for_marklogic(judgment_uri) 893 vars: query_dicts.GetPropertyAsNodeDict = { 894 "uri": uri, 895 "name": name, 896 } 897 value = self._eval_and_decode(vars, "get_property_as_node.xqy") 898 if not value: 899 return None 900 return etree.fromstring(value) 901 902 def get_version_annotation(self, judgment_uri: DocumentURIString) -> str: 903 uri = self._format_uri_for_marklogic(judgment_uri) 904 vars: query_dicts.GetVersionAnnotationDict = { 905 "uri": uri, 906 } 907 return self._eval_and_decode(vars, "get_version_annotation.xqy") 908 909 def get_version_created_datetime(self, judgment_uri: DocumentURIString) -> datetime: 910 uri = self._format_uri_for_marklogic(judgment_uri) 911 vars: query_dicts.GetVersionCreatedDict = { 912 "uri": uri, 913 } 914 return datetime.strptime( 915 self._eval_and_decode(vars, "get_version_created.xqy"), 916 "%Y-%m-%dT%H:%M:%S.%f%z", 917 ) 918 919 def set_property( 920 self, 921 judgment_uri: DocumentURIString, 922 name: str, 923 value: str, 924 ) -> requests.Response: 925 uri = self._format_uri_for_marklogic(judgment_uri) 926 vars: query_dicts.SetPropertyDict = { 927 "uri": uri, 928 "value": value, 929 "name": name, 930 } 931 932 return self._send_to_eval(vars, "set_property.xqy") 933 934 def set_property_as_node( 935 self, 936 judgment_uri: DocumentURIString, 937 name: str, 938 value: etree._Element, 939 ) -> requests.Response: 940 """Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded.""" 941 uri = self._format_uri_for_marklogic(judgment_uri) 942 vars: query_dicts.SetPropertyAsNodeDict = { 943 "uri": uri, 944 "value": etree.tostring(value).decode(), 945 "name": name, 946 } 947 948 return self._send_to_eval(vars, "set_property_as_node.xqy") 949 950 def set_boolean_property( 951 self, 952 judgment_uri: DocumentURIString, 953 name: str, 954 value: bool, 955 ) -> requests.Response: 956 uri = self._format_uri_for_marklogic(judgment_uri) 957 string_value = "true" if value else "false" 958 vars: query_dicts.SetBooleanPropertyDict = { 959 "uri": uri, 960 "value": string_value, 961 "name": name, 962 } 963 """ 964 Set a property within MarkLogic which is specifically a boolean. 965 966 Since XML has no concept of boolean, the actual value in the database is set to `"true"` or `"false"`. 967 """ 968 return self._send_to_eval(vars, "set_boolean_property.xqy") 969 970 def get_boolean_property(self, judgment_uri: DocumentURIString, name: str) -> bool: 971 """ 972 Get a property from MarkLogic which is specifically a boolean. 973 974 :return: `True` if the property exists and has a value of `"true"`, otherwise `False` 975 """ 976 content = self.get_property(judgment_uri, name) 977 return content == "true" 978 979 def set_datetime_property( 980 self, 981 judgment_uri: DocumentURIString, 982 name: str, 983 value: datetime, 984 ) -> requests.Response: 985 """Set a property within MarkLogic which is specifically a datetime.""" 986 uri = self._format_uri_for_marklogic(judgment_uri) 987 vars: query_dicts.SetDatetimePropertyDict = { 988 "uri": uri, 989 "value": value.isoformat(), 990 "name": name, 991 } 992 return self._send_to_eval(vars, "set_datetime_property.xqy") 993 994 def get_datetime_property(self, judgment_uri: DocumentURIString, name: str) -> Optional[datetime]: 995 """ 996 Get a property from MarkLogic which is specifically a datetime. 997 998 :return: A datetime with the value of the property, or `None` if it does not exist 999 """ 1000 content = self.get_property(judgment_uri, name) 1001 1002 if content: 1003 return isoparse(content) 1004 1005 return None 1006 1007 def set_published( 1008 self, 1009 judgment_uri: DocumentURIString, 1010 published: bool, 1011 ) -> requests.Response: 1012 return self.set_boolean_property(judgment_uri, "published", published) 1013 1014 def get_published(self, judgment_uri: DocumentURIString) -> bool: 1015 return self.get_boolean_property(judgment_uri, "published") 1016 1017 def get_last_modified(self, judgment_uri: DocumentURIString) -> str: 1018 uri = self._format_uri_for_marklogic(judgment_uri) 1019 vars: query_dicts.GetLastModifiedDict = { 1020 "uri": uri, 1021 } 1022 1023 response = self._send_to_eval(vars, "get_last_modified.xqy") 1024 1025 if not response.text: 1026 return "" 1027 1028 content = str(decoder.MultipartDecoder.from_response(response).parts[0].text) 1029 return content 1030 1031 def delete_judgment(self, judgment_uri: DocumentURIString) -> requests.Response: 1032 uri = self._format_uri_for_marklogic(judgment_uri) 1033 vars: query_dicts.DeleteJudgmentDict = {"uri": uri} 1034 return self._send_to_eval(vars, "delete_judgment.xqy") 1035 1036 def copy_document( 1037 self, 1038 old: DocumentURIString, 1039 new: DocumentURIString, 1040 ) -> requests.Response: 1041 old_uri = self._format_uri_for_marklogic(old) 1042 new_uri = self._format_uri_for_marklogic(new) 1043 1044 vars: query_dicts.CopyDocumentDict = { 1045 "old_uri": old_uri, 1046 "new_uri": new_uri, 1047 } 1048 return self._send_to_eval(vars, "copy_document.xqy") 1049 1050 def break_checkout(self, judgment_uri: DocumentURIString) -> requests.Response: 1051 uri = self._format_uri_for_marklogic(judgment_uri) 1052 vars: query_dicts.BreakJudgmentCheckoutDict = { 1053 "uri": uri, 1054 } 1055 return self._send_to_eval(vars, "break_judgment_checkout.xqy") 1056 1057 def user_has_privilege( 1058 self, 1059 username: str, 1060 privilege_uri: MarkLogicPrivilegeURIString, 1061 privilege_action: str, 1062 ) -> requests.Response: 1063 vars: query_dicts.UserHasPrivilegeDict = { 1064 "user": username, 1065 "privilege_uri": privilege_uri, 1066 "privilege_action": privilege_action, 1067 } 1068 return self._send_to_eval(vars, "user_has_privilege.xqy") 1069 1070 def user_can_view_unpublished_judgments(self, username: str) -> bool: 1071 if self.user_has_admin_role(username): 1072 return True 1073 1074 check_privilege = self.user_has_privilege( 1075 username, 1076 MarkLogicPrivilegeURIString( 1077 "https://caselaw.nationalarchives.gov.uk/custom/privileges/can-view-unpublished-documents", 1078 ), 1079 "execute", 1080 ) 1081 return get_single_string_from_marklogic_response(check_privilege).lower() == "true" 1082 1083 def user_has_role(self, username: str, role: str) -> requests.Response: 1084 vars: query_dicts.UserHasRoleDict = { 1085 "user": username, 1086 "role": role, 1087 } 1088 return self._send_to_eval(vars, "user_has_role.xqy") 1089 1090 def user_has_admin_role(self, username: str) -> bool: 1091 check_role = self.user_has_role( 1092 username, 1093 "admin", 1094 ) 1095 multipart_data = decoder.MultipartDecoder.from_response(check_role) 1096 result = str(multipart_data.parts[0].text) 1097 return result.lower() == "true" 1098 1099 def calculate_seconds_until_midnight(self, now: Optional[datetime] = None) -> int: 1100 """ 1101 Get timedelta until end of day on the datetime passed, or current time. 1102 https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python 1103 """ 1104 if not now: 1105 now = datetime.now() 1106 tomorrow = now + timedelta(days=1) 1107 difference = datetime.combine(tomorrow, time.min) - now 1108 1109 return difference.seconds 1110 1111 def verify_show_unpublished(self, show_unpublished: bool) -> bool: 1112 if show_unpublished and not self.user_can_view_unpublished_judgments( 1113 self.username, 1114 ): 1115 return False 1116 return show_unpublished 1117 1118 def get_properties_for_search_results( 1119 self, 1120 judgment_uris: list[DocumentURIString], 1121 ) -> str: 1122 uris = [self._format_uri_for_marklogic(judgment_uri) for judgment_uri in judgment_uris] 1123 vars: query_dicts.GetPropertiesForSearchResultsDict = {"uris": uris} 1124 response = self._send_to_eval(vars, "get_properties_for_search_results.xqy") 1125 return get_single_string_from_marklogic_response(response) 1126 1127 def search_and_decode_response(self, search_parameters: SearchParameters) -> bytes: 1128 response = self.advanced_search(search_parameters) 1129 return get_single_bytestring_from_marklogic_response(response) 1130 1131 def search_judgments_and_decode_response( 1132 self, 1133 search_parameters: SearchParameters, 1134 ) -> bytes: 1135 search_parameters.collections = [DOCUMENT_COLLECTION_URI_JUDGMENT] 1136 return self.search_and_decode_response(search_parameters) 1137 1138 def update_document_uri(self, old_uri: DocumentURIString, new_citation: NeutralCitationString) -> DocumentURIString: 1139 """ 1140 Move the document at old_uri to the correct location based on the neutral citation 1141 The new neutral citation *must* not already exist (that is handled elsewhere) 1142 This might not be needed; changing the URI/neutral citation is vanishingly rare 1143 """ 1144 return move.update_document_uri(old_uri, new_citation, api_client=self) 1145 1146 def get_combined_stats_table(self) -> list[list[Any]]: 1147 """Run the combined statistics table xquery and return the result as a list of lists, each representing a table 1148 row.""" 1149 results: list[list[Any]] = json.loads( 1150 get_single_string_from_marklogic_response( 1151 self._send_to_eval({}, "get_combined_stats_table.xqy"), 1152 ), 1153 ) 1154 1155 return results 1156 1157 def get_highest_enrichment_version(self) -> tuple[int, int]: 1158 """This gets the highest enrichment version in the database, 1159 so if nothing has been enriched with the most recent version of enrichment, 1160 this won't reflect that change.""" 1161 table = json.loads( 1162 get_single_string_from_marklogic_response( 1163 self._send_to_eval( 1164 {}, 1165 "get_highest_enrichment_version.xqy", 1166 ), 1167 ), 1168 ) 1169 1170 return (int(table[1][1]), int(table[1][2])) 1171 1172 def get_pending_enrichment_for_version( 1173 self, 1174 target_enrichment_version: tuple[int, int], 1175 target_parser_version: tuple[int, int], 1176 maximum_records: int = 1000, 1177 ) -> list[list[Any]]: 1178 """Retrieve documents which are not yet enriched with a given version.""" 1179 vars: query_dicts.GetPendingEnrichmentForVersionDict = { 1180 "target_enrichment_major_version": target_enrichment_version[0], 1181 "target_enrichment_minor_version": target_enrichment_version[1], 1182 "target_parser_major_version": target_parser_version[0], 1183 "target_parser_minor_version": target_parser_version[1], 1184 "maximum_records": maximum_records, 1185 } 1186 results: list[list[Any]] = json.loads( 1187 get_single_string_from_marklogic_response( 1188 self._send_to_eval( 1189 vars, 1190 "get_pending_enrichment_for_version.xqy", 1191 ), 1192 ), 1193 ) 1194 1195 return results 1196 1197 def get_recently_enriched( 1198 self, 1199 ) -> list[list[Any]]: 1200 """Retrieve documents which are not yet enriched with a given version.""" 1201 results: list[list[Any]] = json.loads( 1202 get_single_string_from_marklogic_response( 1203 self._send_to_eval( 1204 {}, 1205 "get_recently_enriched.xqy", 1206 ), 1207 ), 1208 ) 1209 1210 return results 1211 1212 def get_highest_parser_version(self) -> tuple[int, int]: 1213 """This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change.""" 1214 table = json.loads( 1215 get_single_string_from_marklogic_response( 1216 self._send_to_eval( 1217 {}, 1218 "get_highest_parser_version.xqy", 1219 ), 1220 ), 1221 ) 1222 1223 return (int(table[1][1]), int(table[1][2])) 1224 1225 def get_documents_pending_parse_for_version( 1226 self, 1227 target_version: tuple[int, int], 1228 maximum_records: int = 1000, 1229 ) -> list[list[Any]]: 1230 """Retrieve a list of documents which are not yet parsed with a given version.""" 1231 vars: query_dicts.GetPendingParseForVersionDocumentsDict = { 1232 "target_major_version": target_version[0], 1233 "target_minor_version": target_version[1], 1234 "maximum_records": maximum_records, 1235 } 1236 results: list[list[Any]] = json.loads( 1237 get_single_string_from_marklogic_response( 1238 self._send_to_eval( 1239 vars, 1240 "get_pending_parse_for_version_documents.xqy", 1241 ), 1242 ), 1243 ) 1244 1245 return results 1246 1247 def get_count_pending_parse_for_version( 1248 self, 1249 target_version: tuple[int, int], 1250 ) -> int: 1251 """Get the total number of documents which are not yet parsed with a given version.""" 1252 vars: query_dicts.GetPendingParseForVersionCountDict = { 1253 "target_major_version": target_version[0], 1254 "target_minor_version": target_version[1], 1255 } 1256 results = json.loads( 1257 get_single_string_from_marklogic_response( 1258 self._send_to_eval( 1259 vars, 1260 "get_pending_parse_for_version_count.xqy", 1261 ), 1262 ), 1263 ) 1264 1265 return int(results[1][0]) 1266 1267 def get_recently_parsed( 1268 self, 1269 ) -> list[list[Any]]: 1270 """Retrieve documents which are not yet enriched with a given version.""" 1271 results: list[list[Any]] = json.loads( 1272 get_single_string_from_marklogic_response( 1273 self._send_to_eval( 1274 {}, 1275 "get_recently_parsed.xqy", 1276 ), 1277 ), 1278 ) 1279 1280 return results 1281 1282 def get_locked_documents( 1283 self, 1284 ) -> list[DocumentLock]: 1285 """Retrieve all currently locked documents.""" 1286 results = [ 1287 DocumentLock.from_string(lock) 1288 for lock in get_multipart_strings_from_marklogic_response( 1289 self._send_to_eval({}, "get_locked_documents.xqy") 1290 ) 1291 ] 1292 1293 return sorted(results, key=lambda lock: lock.timestamp) 1294 1295 def get_missing_fclid( 1296 self, 1297 maximum_records: int = 50, 1298 ) -> list[str]: 1299 """Retrieve the URIs of published documents which do not have an identifier in the `fclid` schema.""" 1300 vars: query_dicts.GetMissingFclidDict = { 1301 "maximum_records": maximum_records, 1302 } 1303 1304 results: list[str] = get_multipart_strings_from_marklogic_response( 1305 self._send_to_eval( 1306 vars, 1307 "get_missing_fclid.xqy", 1308 ) 1309 ) 1310 1311 return results 1312 1313 def resolve_from_identifier_slug( 1314 self, identifier_slug: DocumentIdentifierSlug, published_only: bool = True 1315 ) -> IdentifierResolutions: 1316 """Given a PUI/EUI url, look up the precomputed slug and return the 1317 MarkLogic document URIs which match that slug. Multiple returns should be anticipated""" 1318 vars: query_dicts.ResolveFromIdentifierSlugDict = { 1319 "identifier_slug": identifier_slug, 1320 "published_only": int(published_only), 1321 } 1322 raw_results: list[str] = get_multipart_strings_from_marklogic_response( 1323 self._send_to_eval( 1324 vars, 1325 "resolve_from_identifier_slug.xqy", 1326 ), 1327 ) 1328 return IdentifierResolutions.from_marklogic_output(raw_results) 1329 1330 def resolve_from_identifier_value( 1331 self, identifier_value: DocumentIdentifierValue, published_only: bool = True 1332 ) -> IdentifierResolutions: 1333 """Given a PUI/EUI url, look up the precomputed slug and return the 1334 MarkLogic document URIs which match that slug. Multiple returns should be anticipated""" 1335 vars: query_dicts.ResolveFromIdentifierValueDict = { 1336 "identifier_value": identifier_value, 1337 "published_only": int(published_only), 1338 } 1339 raw_results: list[str] = get_multipart_strings_from_marklogic_response( 1340 self._send_to_eval( 1341 vars, 1342 "resolve_from_identifier_value.xqy", 1343 ), 1344 ) 1345 return IdentifierResolutions.from_marklogic_output(raw_results) 1346 1347 def get_next_document_sequence_number(self) -> int: 1348 """Increment the MarkLogic sequence number by one and return the value.""" 1349 return int(self._eval_and_decode({}, "get_next_document_sequence_number.xqy"))
79class NoResponse(Exception): 80 """A requests HTTPError has no response. We expect this will never happen."""
A requests HTTPError has no response. We expect this will never happen.
83class MultipartResponseLongerThanExpected(Exception): 84 """ 85 MarkLogic has returned a multipart response with more than one part, where only a single part was expected. 86 """
MarkLogic has returned a multipart response with more than one part, where only a single part was expected.
89def get_multipart_strings_from_marklogic_response( 90 response: requests.Response, 91) -> list[str]: 92 """ 93 Given a HTTP response from a MarkLogic server, extract the text content from each part of the response. 94 95 :param response: A multipart HTTP response 96 97 :return: A list of the text within each part of the response 98 """ 99 if not (response.content): 100 return [] 101 102 multipart_data = decoder.MultipartDecoder.from_response(response) 103 104 return [part.text for part in multipart_data.parts]
Given a HTTP response from a MarkLogic server, extract the text content from each part of the response.
Parameters
- response: A multipart HTTP response
Returns
A list of the text within each part of the response
118def get_single_string_from_marklogic_response( 119 response: requests.Response, 120) -> str: 121 """ 122 Given a HTTP response from a MarkLogic server, assuming the response contains a single part, extract the text 123 content of the response. 124 125 :param response: A multipart HTTP response 126 127 :return: The text of the response 128 129 :raises MultipartResponseLongerThanExpected: If the response from MarkLogic has more than one part 130 """ 131 parts = get_multipart_strings_from_marklogic_response(response) 132 part_count = len(parts) 133 134 if part_count == 0: 135 # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which 136 # relies on "" being falsy. 137 return "" 138 139 if part_count > 1: 140 raise MultipartResponseLongerThanExpected( 141 f"Response returned {part_count} multipart items, expected 1", 142 ) 143 144 return parts[0]
Given a HTTP response from a MarkLogic server, assuming the response contains a single part, extract the text content of the response.
Parameters
- response: A multipart HTTP response
Returns
The text of the response
Raises
- MultipartResponseLongerThanExpected: If the response from MarkLogic has more than one part
147def get_single_bytestring_from_marklogic_response( 148 response: requests.Response, 149) -> bytes: 150 parts = get_multipart_bytes_from_marklogic_response(response) 151 part_count = len(parts) 152 153 if part_count == 0: 154 # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which 155 # relies on "" being falsy. 156 return b"" 157 158 if part_count > 1: 159 raise MultipartResponseLongerThanExpected( 160 f"Response returned {part_count} multipart items, expected 1", 161 ) 162 163 return parts[0]
166class MarklogicApiClient: 167 """ 168 The base class for interacting with a MarkLogic instance. 169 """ 170 171 http_error_classes: dict[int, Type[MarklogicAPIError]] = { 172 400: MarklogicBadRequestError, 173 401: MarklogicUnauthorizedError, 174 403: MarklogicNotPermittedError, 175 404: MarklogicResourceNotFoundError, 176 504: GatewayTimeoutError, 177 } 178 error_code_classes: dict[str, Type[MarklogicAPIError]] = { 179 "XDMP-DOCNOTFOUND": MarklogicResourceNotFoundError, 180 "XDMP-LOCKCONFLICT": MarklogicResourceLockedError, 181 "XDMP-LOCKED": MarklogicResourceLockedError, 182 "DLS-UNMANAGED": MarklogicResourceUnmanagedError, 183 "DLS-NOTCHECKEDOUT": MarklogicResourceNotCheckedOutError, 184 "DLS-CHECKOUTCONFLICT": MarklogicCheckoutConflictError, 185 "SEC-PRIVDNE": MarklogicNotPermittedError, 186 "XDMP-VALIDATE.*": MarklogicValidationFailedError, 187 "FCL-DOCUMENTNOTFOUND.*": DocumentNotFoundError, 188 } 189 190 default_http_error_class = MarklogicCommunicationError 191 192 def __init__( 193 self, 194 host: str, 195 username: str, 196 password: str, 197 use_https: bool, 198 user_agent: str = DEFAULT_USER_AGENT, 199 ) -> None: 200 self.host = host 201 self.username = username 202 self.password = password 203 self.base_url = f"{'https' if use_https else 'http'}://{self.host}:8011" 204 # Apply auth / common headers to the session 205 self.session = requests.Session() 206 self.session.auth = HTTPBasicAuth(username, password) 207 self.session.headers.update({"User-Agent": user_agent}) 208 self.user_agent = user_agent 209 210 def get_press_summaries_for_document_uri( 211 self, 212 uri: DocumentURIString, 213 ) -> list[PressSummary]: 214 """ 215 Returns a list of PressSummary objects associated with a given Document URI 216 """ 217 vars: query_dicts.GetComponentsForDocumentDict = { 218 "parent_uri": uri, 219 "component": "pressSummary", 220 } 221 response = self._send_to_eval(vars, "get_components_for_document.xqy") 222 uris = get_multipart_strings_from_marklogic_response(response) 223 return [ 224 PressSummary(DocumentURIString(uri.strip("/").strip(".xml")), self) for uri in uris 225 ] # TODO: Migrate this strip behaviour into proper manipulation of a MarkLogicURIString 226 227 def get_document_by_uri( 228 self, 229 uri: DocumentURIString, 230 search_query: Optional[str] = None, 231 ) -> Document: 232 document_type_class = self.get_document_type_from_uri(uri) 233 return document_type_class(uri, self, search_query=search_query) 234 235 def get_document_type_from_uri(self, uri: DocumentURIString) -> Type[Document]: 236 vars: query_dicts.DocumentCollectionsDict = { 237 "uri": self._format_uri_for_marklogic(uri), 238 } 239 response = self._send_to_eval(vars, "document_collections.xqy") 240 collections = get_multipart_strings_from_marklogic_response(response) 241 242 if DOCUMENT_COLLECTION_URI_JUDGMENT in collections: 243 return Judgment 244 if DOCUMENT_COLLECTION_URI_PRESS_SUMMARY in collections: 245 return PressSummary 246 return Document 247 248 def _get_error_code_class(self, error_code: str) -> Type[MarklogicAPIError]: 249 """ 250 Get the exception type for a MarkLogic error code, or the first part of one 251 """ 252 for regex, error in self.error_code_classes.items(): 253 if re.fullmatch(regex, error_code): 254 return error 255 print(f"No error code match found for {error_code}") 256 return self.default_http_error_class 257 258 def _path_to_request_url(self, path: str) -> str: 259 return f"{self.base_url}/{path.lstrip('/')}" 260 261 @classmethod 262 def _get_error_code(cls, content_as_xml: Optional[str]) -> str: 263 logging.warning( 264 "XMLTools is deprecated and will be removed in later versions. " 265 "Use methods from MarklogicApiClient.Client instead.", 266 ) 267 if not content_as_xml: 268 return "Unknown error, Marklogic returned a null or empty response" 269 try: 270 xml = fromstring(content_as_xml) 271 return str( 272 xml.find( 273 "message-code", 274 namespaces={"": "http://marklogic.com/xdmp/error"}, 275 ).text 276 ) 277 except (ParseError, TypeError, AttributeError): 278 return "Unknown error, Marklogic returned a null or empty response" 279 280 def _raise_for_status(self, response: requests.Response) -> None: 281 try: 282 response.raise_for_status() 283 except requests.exceptions.HTTPError as e: 284 if e.response is None: 285 raise NoResponse 286 status_code = e.response.status_code 287 new_error_class = self.http_error_classes.get( 288 status_code, 289 self.default_http_error_class, 290 ) 291 try: 292 response_body = json.dumps(response.json(), indent=4) 293 except requests.JSONDecodeError: 294 response_body = response.text 295 296 if new_error_class == self.default_http_error_class: 297 # Attempt to decode the error code from the response 298 299 error_code = self._get_error_code(response.content.decode("utf-8")) 300 301 new_error_class = self._get_error_code_class(error_code) 302 303 new_exception = new_error_class( 304 f"{e}. Response body:\n{response_body}", 305 ) 306 new_exception.response = response 307 raise new_exception 308 309 def _format_uri_for_marklogic( 310 self, 311 uri: DocumentURIString, 312 ) -> MarkLogicDocumentURIString: 313 """ 314 MarkLogic requires a document URI that begins with a slash `/` and ends in `.xml`. This method ensures any takes 315 a `DocumentURIString` and converts it to a MarkLogic-friendly `MarkLogicDocumentURIString`. 316 317 :return: A `MarkLogicDocumentURIString` at which the document at the given `DocumentURIString` can be located 318 within MarkLogic. 319 """ 320 return MarkLogicDocumentURIString(f"/{uri.lstrip('/').rstrip('/')}.xml") 321 322 def _xquery_path(self, xquery_file_name: str) -> str: 323 return os.path.join(ROOT_DIR, "xquery", xquery_file_name) 324 325 def _send_to_eval( 326 self, 327 vars: query_dicts.MarkLogicAPIDict, 328 xquery_file_name: str, 329 timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT), 330 ) -> requests.Response: 331 return self.eval( 332 self._xquery_path(xquery_file_name), 333 vars=json.dumps(vars), 334 accept_header="application/xml", 335 timeout=timeout, 336 ) 337 338 def _eval_and_decode( 339 self, 340 vars: query_dicts.MarkLogicAPIDict, 341 xquery_file_name: str, 342 ) -> str: 343 response = self._send_to_eval(vars, xquery_file_name) 344 return get_single_string_from_marklogic_response(response) 345 346 def _eval_as_bytes( 347 self, 348 vars: query_dicts.MarkLogicAPIDict, 349 xquery_file_name: str, 350 ) -> bytes: 351 response = self._send_to_eval(vars, xquery_file_name) 352 return get_single_bytestring_from_marklogic_response(response) 353 354 def prepare_request_kwargs( 355 self, 356 method: str, 357 path: str, 358 body: Optional[str] = None, 359 data: Optional[dict[str, Any]] = None, 360 ) -> dict[str, Any]: 361 kwargs = dict(url=self._path_to_request_url(path)) 362 if data is not None: 363 data = {k: v for k, v in data.items() if v is not None} 364 if method == "GET": 365 kwargs["params"] = data # type: ignore 366 else: 367 kwargs["data"] = json.dumps(data) 368 if body is not None: 369 kwargs["data"] = body 370 return kwargs 371 372 def make_request( 373 self, 374 method: str, 375 path: str, 376 headers: CaseInsensitiveDict[Union[str, Any]], 377 body: Optional[str] = None, 378 data: Optional[dict[str, Any]] = None, 379 ) -> requests.Response: 380 kwargs = self.prepare_request_kwargs(method, path, body, data) 381 self.session.headers = headers 382 response = self.session.request(method, **kwargs) 383 # Raise relevant exception for an erroneous response 384 self._raise_for_status(response) 385 return response 386 387 def GET(self, path: str, headers: dict[str, Any], **data: Any) -> requests.Response: 388 logging.warning("GET() is deprecated, use eval() or invoke()") 389 return self.make_request("GET", path, headers, data) # type: ignore 390 391 def POST( 392 self, 393 path: str, 394 headers: dict[str, Any], 395 **data: Any, 396 ) -> requests.Response: 397 logging.warning("POST() is deprecated, use eval() or invoke()") 398 return self.make_request("POST", path, headers, data) # type: ignore 399 400 def document_exists(self, document_uri: DocumentURIString) -> bool: 401 uri = self._format_uri_for_marklogic(document_uri) 402 vars: query_dicts.DocumentExistsDict = { 403 "uri": uri, 404 } 405 decoded_response = self._eval_and_decode(vars, "document_exists.xqy") 406 407 if decoded_response == "true": 408 return True 409 if decoded_response == "false": 410 return False 411 raise RuntimeError("Marklogic response was neither true nor false") 412 413 def get_judgment_xml_bytestring( 414 self, 415 judgment_uri: DocumentURIString, 416 version_uri: Optional[DocumentURIString] = None, 417 show_unpublished: bool = False, 418 search_query: Optional[str] = None, 419 ) -> bytes: 420 marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri) 421 marklogic_document_version_uri = ( 422 MarkLogicDocumentVersionURIString( 423 self._format_uri_for_marklogic(version_uri), 424 ) 425 if version_uri 426 else None 427 ) 428 show_unpublished = self.verify_show_unpublished(show_unpublished) 429 430 vars: query_dicts.GetJudgmentDict = { 431 "uri": marklogic_document_uri, 432 "version_uri": marklogic_document_version_uri, 433 "show_unpublished": show_unpublished, 434 "search_query": search_query, 435 } 436 437 response = self._eval_as_bytes(vars, "get_judgment.xqy") 438 if not response: 439 raise MarklogicNotPermittedError( 440 "The document is not published and show_unpublished was not set", 441 ) 442 443 return response 444 445 def get_judgment_xml( 446 self, 447 judgment_uri: DocumentURIString, 448 version_uri: Optional[DocumentURIString] = None, 449 show_unpublished: bool = False, 450 search_query: Optional[str] = None, 451 ) -> str: 452 return self.get_judgment_xml_bytestring( 453 judgment_uri, 454 version_uri, 455 show_unpublished, 456 search_query=search_query, 457 ).decode(encoding="utf-8") 458 459 def set_document_name( 460 self, 461 document_uri: DocumentURIString, 462 content: str, 463 ) -> requests.Response: 464 uri = self._format_uri_for_marklogic(document_uri) 465 vars: query_dicts.SetMetadataNameDict = {"uri": uri, "content": content} 466 return self._send_to_eval(vars, "set_metadata_name.xqy") 467 468 def set_judgment_date( 469 self, 470 judgment_uri: DocumentURIString, 471 content: str, 472 ) -> requests.Response: 473 warnings.warn( 474 "set_judgment_date() is deprecated, use set_document_work_expression_date()", 475 DeprecationWarning, 476 stacklevel=2, 477 ) 478 return self.set_document_work_expression_date(judgment_uri, content) 479 480 def set_document_work_expression_date( 481 self, 482 document_uri: DocumentURIString, 483 content: str, 484 ) -> requests.Response: 485 uri = self._format_uri_for_marklogic(document_uri) 486 vars: query_dicts.SetMetadataWorkExpressionDateDict = { 487 "uri": uri, 488 "content": content, 489 } 490 491 return self._send_to_eval(vars, "set_metadata_work_expression_date.xqy") 492 493 def set_judgment_citation( 494 self, 495 judgment_uri: DocumentURIString, 496 content: str, 497 ) -> requests.Response: 498 uri = self._format_uri_for_marklogic(judgment_uri) 499 vars: query_dicts.SetMetadataCitationDict = { 500 "uri": uri, 501 "content": content.strip(), 502 } 503 504 return self._send_to_eval(vars, "set_metadata_citation.xqy") 505 506 def set_document_court( 507 self, 508 document_uri: DocumentURIString, 509 content: str, 510 ) -> requests.Response: 511 uri = self._format_uri_for_marklogic(document_uri) 512 vars: query_dicts.SetMetadataCourtDict = {"uri": uri, "content": content} 513 514 return self._send_to_eval(vars, "set_metadata_court.xqy") 515 516 def set_document_jurisdiction( 517 self, 518 document_uri: DocumentURIString, 519 content: str, 520 ) -> requests.Response: 521 uri = self._format_uri_for_marklogic(document_uri) 522 vars: query_dicts.SetMetadataJurisdictionDict = {"uri": uri, "content": content} 523 return self._send_to_eval(vars, "set_metadata_jurisdiction.xqy") 524 525 def set_document_court_and_jurisdiction( 526 self, 527 document_uri: DocumentURIString, 528 content: str, 529 ) -> requests.Response: 530 if "/" in content: 531 court, jurisdiction = re.split("\\s*/\\s*", content) 532 self.set_document_court(document_uri, court) 533 return self.set_document_jurisdiction(document_uri, jurisdiction) 534 self.set_document_court(document_uri, content) 535 return self.set_document_jurisdiction(document_uri, "") 536 537 def set_judgment_this_uri( 538 self, 539 judgment_uri: DocumentURIString, 540 ) -> requests.Response: 541 uri = self._format_uri_for_marklogic(judgment_uri) 542 content_with_id = f"https://caselaw.nationalarchives.gov.uk/id/{judgment_uri.lstrip('/')}" 543 content_without_id = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}" 544 content_with_xml = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}/data.xml" 545 vars: query_dicts.SetMetadataThisUriDict = { 546 "uri": uri, 547 "content_with_id": content_with_id, 548 "content_without_id": content_without_id, 549 "content_with_xml": content_with_xml, 550 } 551 552 return self._send_to_eval(vars, "set_metadata_this_uri.xqy") 553 554 def save_locked_judgment_xml( 555 self, 556 judgment_uri: DocumentURIString, 557 judgment_xml: bytes, 558 annotation: VersionAnnotation, 559 ) -> requests.Response: 560 """assumes the judgment is already locked, does not unlock/check in 561 note this version assumes the XML is raw bytes, rather than a tree...""" 562 563 validate_content_hash(judgment_xml) 564 uri = self._format_uri_for_marklogic(judgment_uri) 565 566 annotation.set_calling_function("save_locked_judgment_xml") 567 annotation.set_calling_agent(self.user_agent) 568 569 vars: query_dicts.UpdateLockedJudgmentDict = { 570 "uri": uri, 571 "judgment": judgment_xml.decode("utf-8"), 572 "annotation": annotation.as_json, 573 } 574 575 return self._send_to_eval(vars, "update_locked_judgment.xqy") 576 577 def insert_document_xml( 578 self, 579 document_uri: DocumentURIString, 580 document_xml: Element, 581 document_type: type[Document], 582 annotation: VersionAnnotation, 583 ) -> requests.Response: 584 """ 585 Insert a new XML document into MarkLogic. 586 587 :param document_uri: The URI to insert the document at 588 :param document_xml: The XML of the document to insert 589 :param document_type: The type class of the document 590 :param annotation: Annotations to record alongside this version 591 592 :return: The response object from MarkLogic 593 """ 594 xml = ElementTree.tostring(document_xml) 595 596 uri = self._format_uri_for_marklogic(document_uri) 597 598 annotation.set_calling_function("insert_document_xml") 599 annotation.set_calling_agent(self.user_agent) 600 601 vars: query_dicts.InsertDocumentDict = { 602 "uri": uri, 603 "type_collection": document_type.type_collection_name, 604 "document": xml.decode("utf-8"), 605 "annotation": annotation.as_json, 606 } 607 608 return self._send_to_eval(vars, "insert_document.xqy") 609 610 def update_document_xml( 611 self, 612 document_uri: DocumentURIString, 613 document_xml: Element, 614 annotation: VersionAnnotation, 615 ) -> requests.Response: 616 """ 617 Updates an existing XML document in MarkLogic with a new version. 618 619 This uses `dls:document-checkout-update-checkin` to perform this in a single operation. 620 621 :param document_uri: The URI of the document to update 622 :param document_xml: The new XML content of the document 623 :param annotation: Annotations to record alongside this version 624 625 :return: The response object from MarkLogic 626 """ 627 xml = ElementTree.tostring(document_xml) 628 629 uri = self._format_uri_for_marklogic(document_uri) 630 631 annotation.set_calling_function("update_document_xml") 632 annotation.set_calling_agent(self.user_agent) 633 634 vars: query_dicts.UpdateDocumentDict = { 635 "uri": uri, 636 "judgment": xml.decode("utf-8"), 637 "annotation": annotation.as_json, 638 } 639 640 return self._send_to_eval(vars, "update_document.xqy") 641 642 def list_judgment_versions( 643 self, 644 judgment_uri: DocumentURIString, 645 ) -> requests.Response: 646 uri = self._format_uri_for_marklogic(judgment_uri) 647 vars: query_dicts.ListJudgmentVersionsDict = {"uri": uri} 648 649 return self._send_to_eval(vars, "list_judgment_versions.xqy") 650 651 def checkout_judgment( 652 self, 653 judgment_uri: DocumentURIString, 654 annotation: str = "", 655 expires_at_midnight: bool = False, 656 timeout_seconds: int = -1, 657 ) -> requests.Response: 658 """If timeout_seconds is -1, the lock never times out""" 659 uri = self._format_uri_for_marklogic(judgment_uri) 660 vars: query_dicts.CheckoutJudgmentDict = { 661 "uri": uri, 662 "annotation": annotation, 663 "timeout": timeout_seconds, 664 } 665 666 if expires_at_midnight: 667 timeout = self.calculate_seconds_until_midnight() 668 vars["timeout"] = timeout 669 670 return self._send_to_eval(vars, "checkout_judgment.xqy") 671 672 def checkin_judgment(self, judgment_uri: DocumentURIString) -> requests.Response: 673 uri = self._format_uri_for_marklogic(judgment_uri) 674 vars: query_dicts.CheckinJudgmentDict = {"uri": uri} 675 676 return self._send_to_eval(vars, "checkin_judgment.xqy") 677 678 def get_judgment_checkout_status( 679 self, 680 judgment_uri: DocumentURIString, 681 ) -> requests.Response: 682 uri = self._format_uri_for_marklogic(judgment_uri) 683 vars: query_dicts.GetJudgmentCheckoutStatusDict = {"uri": uri} 684 685 return self._send_to_eval(vars, "get_judgment_checkout_status.xqy") 686 687 def get_judgment_checkout_status_message( 688 self, 689 judgment_uri: DocumentURIString, 690 ) -> Optional[str]: 691 """Return the annotation of the lock or `None` if there is no lock.""" 692 response = self.get_judgment_checkout_status(judgment_uri) 693 if not response.content: 694 return None 695 content = decoder.MultipartDecoder.from_response(response).parts[0].text 696 if content == "": 697 return None 698 response_xml = ElementTree.fromstring(content) 699 return str( 700 response_xml.find( 701 "dls:annotation", 702 namespaces={"dls": "http://marklogic.com/xdmp/dls"}, 703 ).text 704 ) 705 706 def get_judgment_version( 707 self, 708 judgment_uri: DocumentURIString, 709 version: int, 710 ) -> requests.Response: 711 uri = self._format_uri_for_marklogic(judgment_uri) 712 vars: query_dicts.GetJudgmentVersionDict = {"uri": uri, "version": str(version)} 713 714 return self._send_to_eval(vars, "get_judgment_version.xqy") 715 716 def validate_document(self, document_uri: DocumentURIString) -> bool: 717 vars: query_dicts.ValidateDocumentDict = { 718 "uri": self._format_uri_for_marklogic(document_uri), 719 } 720 response = self._send_to_eval(vars, "validate_document.xqy") 721 content = decoder.MultipartDecoder.from_response(response).parts[0].text 722 xml = ElementTree.fromstring(content) 723 return ( 724 len( 725 xml.findall( 726 ".//error:error", 727 {"error": "http://marklogic.com/xdmp/error"}, 728 ), 729 ) 730 == 0 731 ) 732 733 def has_unique_content_hash(self, judgment_uri: DocumentURIString) -> bool: 734 """ 735 Returns True if the content hash for this document is unique (not shared with other documents). 736 """ 737 uri = self._format_uri_for_marklogic(judgment_uri) 738 vars: CheckContentHashUniqueByUriDict = {"uri": uri} 739 return self._eval_and_decode(vars, "check_content_hash_unique_by_uri.xqy") == "true" 740 741 def eval( 742 self, 743 xquery_path: str, 744 vars: str, 745 accept_header: str = "multipart/mixed", 746 timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT), 747 ) -> requests.Response: 748 headers = { 749 "Content-type": "application/x-www-form-urlencoded", 750 "Accept": accept_header, 751 } 752 data = { 753 "xquery": Path(xquery_path).read_text(), 754 "vars": vars, 755 } 756 path = "LATEST/eval" 757 758 if DEBUG: 759 print(f"Sending {vars} to {xquery_path}") 760 761 response = self.session.request( 762 "POST", 763 url=self._path_to_request_url(path), 764 headers=headers, 765 data=data, 766 timeout=timeout, 767 ) 768 # Raise relevant exception for an erroneous response 769 self._raise_for_status(response) 770 return response 771 772 def invoke( 773 self, 774 module: str, 775 vars: str, 776 accept_header: str = "multipart/mixed", 777 ) -> requests.Response: 778 headers = { 779 "Content-type": "application/x-www-form-urlencoded", 780 "Accept": accept_header, 781 } 782 data = { 783 "module": module, 784 "vars": vars, 785 } 786 path = "LATEST/invoke" 787 response = self.session.request( 788 "POST", 789 url=self._path_to_request_url(path), 790 headers=headers, 791 data=data, 792 ) 793 # Raise relevant exception for an erroneous response 794 self._raise_for_status(response) 795 return response 796 797 def advanced_search(self, search_parameters: SearchParameters) -> requests.Response: 798 """ 799 Performs a search on the entire document set. 800 801 :param query: 802 :param court: 803 :param judge: 804 :param party: 805 :param neutral_citation: 806 :param document_name: 807 :param consignment_number: 808 :param specific_keyword: 809 :param order: 810 :param date_from: 811 :param date_to: 812 :param page: 813 :param page_size: 814 :param show_unpublished: If True, both published and unpublished documents will be returned 815 :param only_unpublished: If True, will only return published documents. Ignores the value of show_unpublished 816 :param collections: 817 :return: 818 """ 819 module = "/judgments/search/search-v2.xqy" # as stored on Marklogic 820 search_parameters.show_unpublished = self.verify_show_unpublished( 821 search_parameters.show_unpublished, 822 ) 823 vars = json.dumps(search_parameters.as_marklogic_payload()) 824 return self.invoke(module, vars) 825 826 def eval_xslt( 827 self, 828 judgment_uri: DocumentURIString, 829 version_uri: Optional[DocumentURIString] = None, 830 show_unpublished: bool = False, 831 xsl_filename: str = DEFAULT_XSL_TRANSFORM, 832 query: Optional[str] = None, 833 ) -> requests.Response: 834 marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri) 835 marklogic_document_version_uri = ( 836 MarkLogicDocumentVersionURIString( 837 self._format_uri_for_marklogic(version_uri), 838 ) 839 if version_uri 840 else None 841 ) 842 843 image_location = os.getenv("XSLT_IMAGE_LOCATION", "") 844 845 show_unpublished = self.verify_show_unpublished(show_unpublished) 846 847 vars: query_dicts.XsltTransformDict = { 848 "uri": marklogic_document_uri, 849 "version_uri": marklogic_document_version_uri, 850 "show_unpublished": show_unpublished, 851 "img_location": image_location, 852 "xsl_filename": xsl_filename, 853 "query": query, 854 } 855 856 return self._send_to_eval(vars, "xslt_transform.xqy") 857 858 def accessible_judgment_transformation( 859 self, 860 judgment_uri: DocumentURIString, 861 version_uri: Optional[DocumentURIString] = None, 862 show_unpublished: bool = False, 863 ) -> requests.Response: 864 return self.eval_xslt( 865 judgment_uri, 866 version_uri, 867 show_unpublished, 868 xsl_filename=DEFAULT_XSL_TRANSFORM, 869 ) 870 871 def original_judgment_transformation( 872 self, 873 judgment_uri: DocumentURIString, 874 version_uri: Optional[DocumentURIString] = None, 875 show_unpublished: bool = False, 876 ) -> requests.Response: 877 return self.eval_xslt( 878 judgment_uri, 879 version_uri, 880 show_unpublished, 881 xsl_filename="as-handed-down.xsl", 882 ) 883 884 def get_property(self, judgment_uri: DocumentURIString, name: str) -> str: 885 uri = self._format_uri_for_marklogic(judgment_uri) 886 vars: query_dicts.GetPropertyDict = { 887 "uri": uri, 888 "name": name, 889 } 890 return self._eval_and_decode(vars, "get_property.xqy") 891 892 def get_property_as_node(self, judgment_uri: DocumentURIString, name: str) -> Optional[etree._Element]: 893 uri = self._format_uri_for_marklogic(judgment_uri) 894 vars: query_dicts.GetPropertyAsNodeDict = { 895 "uri": uri, 896 "name": name, 897 } 898 value = self._eval_and_decode(vars, "get_property_as_node.xqy") 899 if not value: 900 return None 901 return etree.fromstring(value) 902 903 def get_version_annotation(self, judgment_uri: DocumentURIString) -> str: 904 uri = self._format_uri_for_marklogic(judgment_uri) 905 vars: query_dicts.GetVersionAnnotationDict = { 906 "uri": uri, 907 } 908 return self._eval_and_decode(vars, "get_version_annotation.xqy") 909 910 def get_version_created_datetime(self, judgment_uri: DocumentURIString) -> datetime: 911 uri = self._format_uri_for_marklogic(judgment_uri) 912 vars: query_dicts.GetVersionCreatedDict = { 913 "uri": uri, 914 } 915 return datetime.strptime( 916 self._eval_and_decode(vars, "get_version_created.xqy"), 917 "%Y-%m-%dT%H:%M:%S.%f%z", 918 ) 919 920 def set_property( 921 self, 922 judgment_uri: DocumentURIString, 923 name: str, 924 value: str, 925 ) -> requests.Response: 926 uri = self._format_uri_for_marklogic(judgment_uri) 927 vars: query_dicts.SetPropertyDict = { 928 "uri": uri, 929 "value": value, 930 "name": name, 931 } 932 933 return self._send_to_eval(vars, "set_property.xqy") 934 935 def set_property_as_node( 936 self, 937 judgment_uri: DocumentURIString, 938 name: str, 939 value: etree._Element, 940 ) -> requests.Response: 941 """Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded.""" 942 uri = self._format_uri_for_marklogic(judgment_uri) 943 vars: query_dicts.SetPropertyAsNodeDict = { 944 "uri": uri, 945 "value": etree.tostring(value).decode(), 946 "name": name, 947 } 948 949 return self._send_to_eval(vars, "set_property_as_node.xqy") 950 951 def set_boolean_property( 952 self, 953 judgment_uri: DocumentURIString, 954 name: str, 955 value: bool, 956 ) -> requests.Response: 957 uri = self._format_uri_for_marklogic(judgment_uri) 958 string_value = "true" if value else "false" 959 vars: query_dicts.SetBooleanPropertyDict = { 960 "uri": uri, 961 "value": string_value, 962 "name": name, 963 } 964 """ 965 Set a property within MarkLogic which is specifically a boolean. 966 967 Since XML has no concept of boolean, the actual value in the database is set to `"true"` or `"false"`. 968 """ 969 return self._send_to_eval(vars, "set_boolean_property.xqy") 970 971 def get_boolean_property(self, judgment_uri: DocumentURIString, name: str) -> bool: 972 """ 973 Get a property from MarkLogic which is specifically a boolean. 974 975 :return: `True` if the property exists and has a value of `"true"`, otherwise `False` 976 """ 977 content = self.get_property(judgment_uri, name) 978 return content == "true" 979 980 def set_datetime_property( 981 self, 982 judgment_uri: DocumentURIString, 983 name: str, 984 value: datetime, 985 ) -> requests.Response: 986 """Set a property within MarkLogic which is specifically a datetime.""" 987 uri = self._format_uri_for_marklogic(judgment_uri) 988 vars: query_dicts.SetDatetimePropertyDict = { 989 "uri": uri, 990 "value": value.isoformat(), 991 "name": name, 992 } 993 return self._send_to_eval(vars, "set_datetime_property.xqy") 994 995 def get_datetime_property(self, judgment_uri: DocumentURIString, name: str) -> Optional[datetime]: 996 """ 997 Get a property from MarkLogic which is specifically a datetime. 998 999 :return: A datetime with the value of the property, or `None` if it does not exist 1000 """ 1001 content = self.get_property(judgment_uri, name) 1002 1003 if content: 1004 return isoparse(content) 1005 1006 return None 1007 1008 def set_published( 1009 self, 1010 judgment_uri: DocumentURIString, 1011 published: bool, 1012 ) -> requests.Response: 1013 return self.set_boolean_property(judgment_uri, "published", published) 1014 1015 def get_published(self, judgment_uri: DocumentURIString) -> bool: 1016 return self.get_boolean_property(judgment_uri, "published") 1017 1018 def get_last_modified(self, judgment_uri: DocumentURIString) -> str: 1019 uri = self._format_uri_for_marklogic(judgment_uri) 1020 vars: query_dicts.GetLastModifiedDict = { 1021 "uri": uri, 1022 } 1023 1024 response = self._send_to_eval(vars, "get_last_modified.xqy") 1025 1026 if not response.text: 1027 return "" 1028 1029 content = str(decoder.MultipartDecoder.from_response(response).parts[0].text) 1030 return content 1031 1032 def delete_judgment(self, judgment_uri: DocumentURIString) -> requests.Response: 1033 uri = self._format_uri_for_marklogic(judgment_uri) 1034 vars: query_dicts.DeleteJudgmentDict = {"uri": uri} 1035 return self._send_to_eval(vars, "delete_judgment.xqy") 1036 1037 def copy_document( 1038 self, 1039 old: DocumentURIString, 1040 new: DocumentURIString, 1041 ) -> requests.Response: 1042 old_uri = self._format_uri_for_marklogic(old) 1043 new_uri = self._format_uri_for_marklogic(new) 1044 1045 vars: query_dicts.CopyDocumentDict = { 1046 "old_uri": old_uri, 1047 "new_uri": new_uri, 1048 } 1049 return self._send_to_eval(vars, "copy_document.xqy") 1050 1051 def break_checkout(self, judgment_uri: DocumentURIString) -> requests.Response: 1052 uri = self._format_uri_for_marklogic(judgment_uri) 1053 vars: query_dicts.BreakJudgmentCheckoutDict = { 1054 "uri": uri, 1055 } 1056 return self._send_to_eval(vars, "break_judgment_checkout.xqy") 1057 1058 def user_has_privilege( 1059 self, 1060 username: str, 1061 privilege_uri: MarkLogicPrivilegeURIString, 1062 privilege_action: str, 1063 ) -> requests.Response: 1064 vars: query_dicts.UserHasPrivilegeDict = { 1065 "user": username, 1066 "privilege_uri": privilege_uri, 1067 "privilege_action": privilege_action, 1068 } 1069 return self._send_to_eval(vars, "user_has_privilege.xqy") 1070 1071 def user_can_view_unpublished_judgments(self, username: str) -> bool: 1072 if self.user_has_admin_role(username): 1073 return True 1074 1075 check_privilege = self.user_has_privilege( 1076 username, 1077 MarkLogicPrivilegeURIString( 1078 "https://caselaw.nationalarchives.gov.uk/custom/privileges/can-view-unpublished-documents", 1079 ), 1080 "execute", 1081 ) 1082 return get_single_string_from_marklogic_response(check_privilege).lower() == "true" 1083 1084 def user_has_role(self, username: str, role: str) -> requests.Response: 1085 vars: query_dicts.UserHasRoleDict = { 1086 "user": username, 1087 "role": role, 1088 } 1089 return self._send_to_eval(vars, "user_has_role.xqy") 1090 1091 def user_has_admin_role(self, username: str) -> bool: 1092 check_role = self.user_has_role( 1093 username, 1094 "admin", 1095 ) 1096 multipart_data = decoder.MultipartDecoder.from_response(check_role) 1097 result = str(multipart_data.parts[0].text) 1098 return result.lower() == "true" 1099 1100 def calculate_seconds_until_midnight(self, now: Optional[datetime] = None) -> int: 1101 """ 1102 Get timedelta until end of day on the datetime passed, or current time. 1103 https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python 1104 """ 1105 if not now: 1106 now = datetime.now() 1107 tomorrow = now + timedelta(days=1) 1108 difference = datetime.combine(tomorrow, time.min) - now 1109 1110 return difference.seconds 1111 1112 def verify_show_unpublished(self, show_unpublished: bool) -> bool: 1113 if show_unpublished and not self.user_can_view_unpublished_judgments( 1114 self.username, 1115 ): 1116 return False 1117 return show_unpublished 1118 1119 def get_properties_for_search_results( 1120 self, 1121 judgment_uris: list[DocumentURIString], 1122 ) -> str: 1123 uris = [self._format_uri_for_marklogic(judgment_uri) for judgment_uri in judgment_uris] 1124 vars: query_dicts.GetPropertiesForSearchResultsDict = {"uris": uris} 1125 response = self._send_to_eval(vars, "get_properties_for_search_results.xqy") 1126 return get_single_string_from_marklogic_response(response) 1127 1128 def search_and_decode_response(self, search_parameters: SearchParameters) -> bytes: 1129 response = self.advanced_search(search_parameters) 1130 return get_single_bytestring_from_marklogic_response(response) 1131 1132 def search_judgments_and_decode_response( 1133 self, 1134 search_parameters: SearchParameters, 1135 ) -> bytes: 1136 search_parameters.collections = [DOCUMENT_COLLECTION_URI_JUDGMENT] 1137 return self.search_and_decode_response(search_parameters) 1138 1139 def update_document_uri(self, old_uri: DocumentURIString, new_citation: NeutralCitationString) -> DocumentURIString: 1140 """ 1141 Move the document at old_uri to the correct location based on the neutral citation 1142 The new neutral citation *must* not already exist (that is handled elsewhere) 1143 This might not be needed; changing the URI/neutral citation is vanishingly rare 1144 """ 1145 return move.update_document_uri(old_uri, new_citation, api_client=self) 1146 1147 def get_combined_stats_table(self) -> list[list[Any]]: 1148 """Run the combined statistics table xquery and return the result as a list of lists, each representing a table 1149 row.""" 1150 results: list[list[Any]] = json.loads( 1151 get_single_string_from_marklogic_response( 1152 self._send_to_eval({}, "get_combined_stats_table.xqy"), 1153 ), 1154 ) 1155 1156 return results 1157 1158 def get_highest_enrichment_version(self) -> tuple[int, int]: 1159 """This gets the highest enrichment version in the database, 1160 so if nothing has been enriched with the most recent version of enrichment, 1161 this won't reflect that change.""" 1162 table = json.loads( 1163 get_single_string_from_marklogic_response( 1164 self._send_to_eval( 1165 {}, 1166 "get_highest_enrichment_version.xqy", 1167 ), 1168 ), 1169 ) 1170 1171 return (int(table[1][1]), int(table[1][2])) 1172 1173 def get_pending_enrichment_for_version( 1174 self, 1175 target_enrichment_version: tuple[int, int], 1176 target_parser_version: tuple[int, int], 1177 maximum_records: int = 1000, 1178 ) -> list[list[Any]]: 1179 """Retrieve documents which are not yet enriched with a given version.""" 1180 vars: query_dicts.GetPendingEnrichmentForVersionDict = { 1181 "target_enrichment_major_version": target_enrichment_version[0], 1182 "target_enrichment_minor_version": target_enrichment_version[1], 1183 "target_parser_major_version": target_parser_version[0], 1184 "target_parser_minor_version": target_parser_version[1], 1185 "maximum_records": maximum_records, 1186 } 1187 results: list[list[Any]] = json.loads( 1188 get_single_string_from_marklogic_response( 1189 self._send_to_eval( 1190 vars, 1191 "get_pending_enrichment_for_version.xqy", 1192 ), 1193 ), 1194 ) 1195 1196 return results 1197 1198 def get_recently_enriched( 1199 self, 1200 ) -> list[list[Any]]: 1201 """Retrieve documents which are not yet enriched with a given version.""" 1202 results: list[list[Any]] = json.loads( 1203 get_single_string_from_marklogic_response( 1204 self._send_to_eval( 1205 {}, 1206 "get_recently_enriched.xqy", 1207 ), 1208 ), 1209 ) 1210 1211 return results 1212 1213 def get_highest_parser_version(self) -> tuple[int, int]: 1214 """This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change.""" 1215 table = json.loads( 1216 get_single_string_from_marklogic_response( 1217 self._send_to_eval( 1218 {}, 1219 "get_highest_parser_version.xqy", 1220 ), 1221 ), 1222 ) 1223 1224 return (int(table[1][1]), int(table[1][2])) 1225 1226 def get_documents_pending_parse_for_version( 1227 self, 1228 target_version: tuple[int, int], 1229 maximum_records: int = 1000, 1230 ) -> list[list[Any]]: 1231 """Retrieve a list of documents which are not yet parsed with a given version.""" 1232 vars: query_dicts.GetPendingParseForVersionDocumentsDict = { 1233 "target_major_version": target_version[0], 1234 "target_minor_version": target_version[1], 1235 "maximum_records": maximum_records, 1236 } 1237 results: list[list[Any]] = json.loads( 1238 get_single_string_from_marklogic_response( 1239 self._send_to_eval( 1240 vars, 1241 "get_pending_parse_for_version_documents.xqy", 1242 ), 1243 ), 1244 ) 1245 1246 return results 1247 1248 def get_count_pending_parse_for_version( 1249 self, 1250 target_version: tuple[int, int], 1251 ) -> int: 1252 """Get the total number of documents which are not yet parsed with a given version.""" 1253 vars: query_dicts.GetPendingParseForVersionCountDict = { 1254 "target_major_version": target_version[0], 1255 "target_minor_version": target_version[1], 1256 } 1257 results = json.loads( 1258 get_single_string_from_marklogic_response( 1259 self._send_to_eval( 1260 vars, 1261 "get_pending_parse_for_version_count.xqy", 1262 ), 1263 ), 1264 ) 1265 1266 return int(results[1][0]) 1267 1268 def get_recently_parsed( 1269 self, 1270 ) -> list[list[Any]]: 1271 """Retrieve documents which are not yet enriched with a given version.""" 1272 results: list[list[Any]] = json.loads( 1273 get_single_string_from_marklogic_response( 1274 self._send_to_eval( 1275 {}, 1276 "get_recently_parsed.xqy", 1277 ), 1278 ), 1279 ) 1280 1281 return results 1282 1283 def get_locked_documents( 1284 self, 1285 ) -> list[DocumentLock]: 1286 """Retrieve all currently locked documents.""" 1287 results = [ 1288 DocumentLock.from_string(lock) 1289 for lock in get_multipart_strings_from_marklogic_response( 1290 self._send_to_eval({}, "get_locked_documents.xqy") 1291 ) 1292 ] 1293 1294 return sorted(results, key=lambda lock: lock.timestamp) 1295 1296 def get_missing_fclid( 1297 self, 1298 maximum_records: int = 50, 1299 ) -> list[str]: 1300 """Retrieve the URIs of published documents which do not have an identifier in the `fclid` schema.""" 1301 vars: query_dicts.GetMissingFclidDict = { 1302 "maximum_records": maximum_records, 1303 } 1304 1305 results: list[str] = get_multipart_strings_from_marklogic_response( 1306 self._send_to_eval( 1307 vars, 1308 "get_missing_fclid.xqy", 1309 ) 1310 ) 1311 1312 return results 1313 1314 def resolve_from_identifier_slug( 1315 self, identifier_slug: DocumentIdentifierSlug, published_only: bool = True 1316 ) -> IdentifierResolutions: 1317 """Given a PUI/EUI url, look up the precomputed slug and return the 1318 MarkLogic document URIs which match that slug. Multiple returns should be anticipated""" 1319 vars: query_dicts.ResolveFromIdentifierSlugDict = { 1320 "identifier_slug": identifier_slug, 1321 "published_only": int(published_only), 1322 } 1323 raw_results: list[str] = get_multipart_strings_from_marklogic_response( 1324 self._send_to_eval( 1325 vars, 1326 "resolve_from_identifier_slug.xqy", 1327 ), 1328 ) 1329 return IdentifierResolutions.from_marklogic_output(raw_results) 1330 1331 def resolve_from_identifier_value( 1332 self, identifier_value: DocumentIdentifierValue, published_only: bool = True 1333 ) -> IdentifierResolutions: 1334 """Given a PUI/EUI url, look up the precomputed slug and return the 1335 MarkLogic document URIs which match that slug. Multiple returns should be anticipated""" 1336 vars: query_dicts.ResolveFromIdentifierValueDict = { 1337 "identifier_value": identifier_value, 1338 "published_only": int(published_only), 1339 } 1340 raw_results: list[str] = get_multipart_strings_from_marklogic_response( 1341 self._send_to_eval( 1342 vars, 1343 "resolve_from_identifier_value.xqy", 1344 ), 1345 ) 1346 return IdentifierResolutions.from_marklogic_output(raw_results) 1347 1348 def get_next_document_sequence_number(self) -> int: 1349 """Increment the MarkLogic sequence number by one and return the value.""" 1350 return int(self._eval_and_decode({}, "get_next_document_sequence_number.xqy"))
The base class for interacting with a MarkLogic instance.
192 def __init__( 193 self, 194 host: str, 195 username: str, 196 password: str, 197 use_https: bool, 198 user_agent: str = DEFAULT_USER_AGENT, 199 ) -> None: 200 self.host = host 201 self.username = username 202 self.password = password 203 self.base_url = f"{'https' if use_https else 'http'}://{self.host}:8011" 204 # Apply auth / common headers to the session 205 self.session = requests.Session() 206 self.session.auth = HTTPBasicAuth(username, password) 207 self.session.headers.update({"User-Agent": user_agent}) 208 self.user_agent = user_agent
210 def get_press_summaries_for_document_uri( 211 self, 212 uri: DocumentURIString, 213 ) -> list[PressSummary]: 214 """ 215 Returns a list of PressSummary objects associated with a given Document URI 216 """ 217 vars: query_dicts.GetComponentsForDocumentDict = { 218 "parent_uri": uri, 219 "component": "pressSummary", 220 } 221 response = self._send_to_eval(vars, "get_components_for_document.xqy") 222 uris = get_multipart_strings_from_marklogic_response(response) 223 return [ 224 PressSummary(DocumentURIString(uri.strip("/").strip(".xml")), self) for uri in uris 225 ] # TODO: Migrate this strip behaviour into proper manipulation of a MarkLogicURIString
Returns a list of PressSummary objects associated with a given Document URI
235 def get_document_type_from_uri(self, uri: DocumentURIString) -> Type[Document]: 236 vars: query_dicts.DocumentCollectionsDict = { 237 "uri": self._format_uri_for_marklogic(uri), 238 } 239 response = self._send_to_eval(vars, "document_collections.xqy") 240 collections = get_multipart_strings_from_marklogic_response(response) 241 242 if DOCUMENT_COLLECTION_URI_JUDGMENT in collections: 243 return Judgment 244 if DOCUMENT_COLLECTION_URI_PRESS_SUMMARY in collections: 245 return PressSummary 246 return Document
354 def prepare_request_kwargs( 355 self, 356 method: str, 357 path: str, 358 body: Optional[str] = None, 359 data: Optional[dict[str, Any]] = None, 360 ) -> dict[str, Any]: 361 kwargs = dict(url=self._path_to_request_url(path)) 362 if data is not None: 363 data = {k: v for k, v in data.items() if v is not None} 364 if method == "GET": 365 kwargs["params"] = data # type: ignore 366 else: 367 kwargs["data"] = json.dumps(data) 368 if body is not None: 369 kwargs["data"] = body 370 return kwargs
372 def make_request( 373 self, 374 method: str, 375 path: str, 376 headers: CaseInsensitiveDict[Union[str, Any]], 377 body: Optional[str] = None, 378 data: Optional[dict[str, Any]] = None, 379 ) -> requests.Response: 380 kwargs = self.prepare_request_kwargs(method, path, body, data) 381 self.session.headers = headers 382 response = self.session.request(method, **kwargs) 383 # Raise relevant exception for an erroneous response 384 self._raise_for_status(response) 385 return response
400 def document_exists(self, document_uri: DocumentURIString) -> bool: 401 uri = self._format_uri_for_marklogic(document_uri) 402 vars: query_dicts.DocumentExistsDict = { 403 "uri": uri, 404 } 405 decoded_response = self._eval_and_decode(vars, "document_exists.xqy") 406 407 if decoded_response == "true": 408 return True 409 if decoded_response == "false": 410 return False 411 raise RuntimeError("Marklogic response was neither true nor false")
413 def get_judgment_xml_bytestring( 414 self, 415 judgment_uri: DocumentURIString, 416 version_uri: Optional[DocumentURIString] = None, 417 show_unpublished: bool = False, 418 search_query: Optional[str] = None, 419 ) -> bytes: 420 marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri) 421 marklogic_document_version_uri = ( 422 MarkLogicDocumentVersionURIString( 423 self._format_uri_for_marklogic(version_uri), 424 ) 425 if version_uri 426 else None 427 ) 428 show_unpublished = self.verify_show_unpublished(show_unpublished) 429 430 vars: query_dicts.GetJudgmentDict = { 431 "uri": marklogic_document_uri, 432 "version_uri": marklogic_document_version_uri, 433 "show_unpublished": show_unpublished, 434 "search_query": search_query, 435 } 436 437 response = self._eval_as_bytes(vars, "get_judgment.xqy") 438 if not response: 439 raise MarklogicNotPermittedError( 440 "The document is not published and show_unpublished was not set", 441 ) 442 443 return response
445 def get_judgment_xml( 446 self, 447 judgment_uri: DocumentURIString, 448 version_uri: Optional[DocumentURIString] = None, 449 show_unpublished: bool = False, 450 search_query: Optional[str] = None, 451 ) -> str: 452 return self.get_judgment_xml_bytestring( 453 judgment_uri, 454 version_uri, 455 show_unpublished, 456 search_query=search_query, 457 ).decode(encoding="utf-8")
459 def set_document_name( 460 self, 461 document_uri: DocumentURIString, 462 content: str, 463 ) -> requests.Response: 464 uri = self._format_uri_for_marklogic(document_uri) 465 vars: query_dicts.SetMetadataNameDict = {"uri": uri, "content": content} 466 return self._send_to_eval(vars, "set_metadata_name.xqy")
468 def set_judgment_date( 469 self, 470 judgment_uri: DocumentURIString, 471 content: str, 472 ) -> requests.Response: 473 warnings.warn( 474 "set_judgment_date() is deprecated, use set_document_work_expression_date()", 475 DeprecationWarning, 476 stacklevel=2, 477 ) 478 return self.set_document_work_expression_date(judgment_uri, content)
480 def set_document_work_expression_date( 481 self, 482 document_uri: DocumentURIString, 483 content: str, 484 ) -> requests.Response: 485 uri = self._format_uri_for_marklogic(document_uri) 486 vars: query_dicts.SetMetadataWorkExpressionDateDict = { 487 "uri": uri, 488 "content": content, 489 } 490 491 return self._send_to_eval(vars, "set_metadata_work_expression_date.xqy")
493 def set_judgment_citation( 494 self, 495 judgment_uri: DocumentURIString, 496 content: str, 497 ) -> requests.Response: 498 uri = self._format_uri_for_marklogic(judgment_uri) 499 vars: query_dicts.SetMetadataCitationDict = { 500 "uri": uri, 501 "content": content.strip(), 502 } 503 504 return self._send_to_eval(vars, "set_metadata_citation.xqy")
506 def set_document_court( 507 self, 508 document_uri: DocumentURIString, 509 content: str, 510 ) -> requests.Response: 511 uri = self._format_uri_for_marklogic(document_uri) 512 vars: query_dicts.SetMetadataCourtDict = {"uri": uri, "content": content} 513 514 return self._send_to_eval(vars, "set_metadata_court.xqy")
516 def set_document_jurisdiction( 517 self, 518 document_uri: DocumentURIString, 519 content: str, 520 ) -> requests.Response: 521 uri = self._format_uri_for_marklogic(document_uri) 522 vars: query_dicts.SetMetadataJurisdictionDict = {"uri": uri, "content": content} 523 return self._send_to_eval(vars, "set_metadata_jurisdiction.xqy")
525 def set_document_court_and_jurisdiction( 526 self, 527 document_uri: DocumentURIString, 528 content: str, 529 ) -> requests.Response: 530 if "/" in content: 531 court, jurisdiction = re.split("\\s*/\\s*", content) 532 self.set_document_court(document_uri, court) 533 return self.set_document_jurisdiction(document_uri, jurisdiction) 534 self.set_document_court(document_uri, content) 535 return self.set_document_jurisdiction(document_uri, "")
537 def set_judgment_this_uri( 538 self, 539 judgment_uri: DocumentURIString, 540 ) -> requests.Response: 541 uri = self._format_uri_for_marklogic(judgment_uri) 542 content_with_id = f"https://caselaw.nationalarchives.gov.uk/id/{judgment_uri.lstrip('/')}" 543 content_without_id = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}" 544 content_with_xml = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}/data.xml" 545 vars: query_dicts.SetMetadataThisUriDict = { 546 "uri": uri, 547 "content_with_id": content_with_id, 548 "content_without_id": content_without_id, 549 "content_with_xml": content_with_xml, 550 } 551 552 return self._send_to_eval(vars, "set_metadata_this_uri.xqy")
554 def save_locked_judgment_xml( 555 self, 556 judgment_uri: DocumentURIString, 557 judgment_xml: bytes, 558 annotation: VersionAnnotation, 559 ) -> requests.Response: 560 """assumes the judgment is already locked, does not unlock/check in 561 note this version assumes the XML is raw bytes, rather than a tree...""" 562 563 validate_content_hash(judgment_xml) 564 uri = self._format_uri_for_marklogic(judgment_uri) 565 566 annotation.set_calling_function("save_locked_judgment_xml") 567 annotation.set_calling_agent(self.user_agent) 568 569 vars: query_dicts.UpdateLockedJudgmentDict = { 570 "uri": uri, 571 "judgment": judgment_xml.decode("utf-8"), 572 "annotation": annotation.as_json, 573 } 574 575 return self._send_to_eval(vars, "update_locked_judgment.xqy")
assumes the judgment is already locked, does not unlock/check in note this version assumes the XML is raw bytes, rather than a tree...
577 def insert_document_xml( 578 self, 579 document_uri: DocumentURIString, 580 document_xml: Element, 581 document_type: type[Document], 582 annotation: VersionAnnotation, 583 ) -> requests.Response: 584 """ 585 Insert a new XML document into MarkLogic. 586 587 :param document_uri: The URI to insert the document at 588 :param document_xml: The XML of the document to insert 589 :param document_type: The type class of the document 590 :param annotation: Annotations to record alongside this version 591 592 :return: The response object from MarkLogic 593 """ 594 xml = ElementTree.tostring(document_xml) 595 596 uri = self._format_uri_for_marklogic(document_uri) 597 598 annotation.set_calling_function("insert_document_xml") 599 annotation.set_calling_agent(self.user_agent) 600 601 vars: query_dicts.InsertDocumentDict = { 602 "uri": uri, 603 "type_collection": document_type.type_collection_name, 604 "document": xml.decode("utf-8"), 605 "annotation": annotation.as_json, 606 } 607 608 return self._send_to_eval(vars, "insert_document.xqy")
Insert a new XML document into MarkLogic.
Parameters
- document_uri: The URI to insert the document at
- document_xml: The XML of the document to insert
- document_type: The type class of the document
- annotation: Annotations to record alongside this version
Returns
The response object from MarkLogic
610 def update_document_xml( 611 self, 612 document_uri: DocumentURIString, 613 document_xml: Element, 614 annotation: VersionAnnotation, 615 ) -> requests.Response: 616 """ 617 Updates an existing XML document in MarkLogic with a new version. 618 619 This uses `dls:document-checkout-update-checkin` to perform this in a single operation. 620 621 :param document_uri: The URI of the document to update 622 :param document_xml: The new XML content of the document 623 :param annotation: Annotations to record alongside this version 624 625 :return: The response object from MarkLogic 626 """ 627 xml = ElementTree.tostring(document_xml) 628 629 uri = self._format_uri_for_marklogic(document_uri) 630 631 annotation.set_calling_function("update_document_xml") 632 annotation.set_calling_agent(self.user_agent) 633 634 vars: query_dicts.UpdateDocumentDict = { 635 "uri": uri, 636 "judgment": xml.decode("utf-8"), 637 "annotation": annotation.as_json, 638 } 639 640 return self._send_to_eval(vars, "update_document.xqy")
Updates an existing XML document in MarkLogic with a new version.
This uses dls:document-checkout-update-checkin to perform this in a single operation.
Parameters
- document_uri: The URI of the document to update
- document_xml: The new XML content of the document
- annotation: Annotations to record alongside this version
Returns
The response object from MarkLogic
651 def checkout_judgment( 652 self, 653 judgment_uri: DocumentURIString, 654 annotation: str = "", 655 expires_at_midnight: bool = False, 656 timeout_seconds: int = -1, 657 ) -> requests.Response: 658 """If timeout_seconds is -1, the lock never times out""" 659 uri = self._format_uri_for_marklogic(judgment_uri) 660 vars: query_dicts.CheckoutJudgmentDict = { 661 "uri": uri, 662 "annotation": annotation, 663 "timeout": timeout_seconds, 664 } 665 666 if expires_at_midnight: 667 timeout = self.calculate_seconds_until_midnight() 668 vars["timeout"] = timeout 669 670 return self._send_to_eval(vars, "checkout_judgment.xqy")
If timeout_seconds is -1, the lock never times out
678 def get_judgment_checkout_status( 679 self, 680 judgment_uri: DocumentURIString, 681 ) -> requests.Response: 682 uri = self._format_uri_for_marklogic(judgment_uri) 683 vars: query_dicts.GetJudgmentCheckoutStatusDict = {"uri": uri} 684 685 return self._send_to_eval(vars, "get_judgment_checkout_status.xqy")
687 def get_judgment_checkout_status_message( 688 self, 689 judgment_uri: DocumentURIString, 690 ) -> Optional[str]: 691 """Return the annotation of the lock or `None` if there is no lock.""" 692 response = self.get_judgment_checkout_status(judgment_uri) 693 if not response.content: 694 return None 695 content = decoder.MultipartDecoder.from_response(response).parts[0].text 696 if content == "": 697 return None 698 response_xml = ElementTree.fromstring(content) 699 return str( 700 response_xml.find( 701 "dls:annotation", 702 namespaces={"dls": "http://marklogic.com/xdmp/dls"}, 703 ).text 704 )
Return the annotation of the lock or None if there is no lock.
706 def get_judgment_version( 707 self, 708 judgment_uri: DocumentURIString, 709 version: int, 710 ) -> requests.Response: 711 uri = self._format_uri_for_marklogic(judgment_uri) 712 vars: query_dicts.GetJudgmentVersionDict = {"uri": uri, "version": str(version)} 713 714 return self._send_to_eval(vars, "get_judgment_version.xqy")
716 def validate_document(self, document_uri: DocumentURIString) -> bool: 717 vars: query_dicts.ValidateDocumentDict = { 718 "uri": self._format_uri_for_marklogic(document_uri), 719 } 720 response = self._send_to_eval(vars, "validate_document.xqy") 721 content = decoder.MultipartDecoder.from_response(response).parts[0].text 722 xml = ElementTree.fromstring(content) 723 return ( 724 len( 725 xml.findall( 726 ".//error:error", 727 {"error": "http://marklogic.com/xdmp/error"}, 728 ), 729 ) 730 == 0 731 )
733 def has_unique_content_hash(self, judgment_uri: DocumentURIString) -> bool: 734 """ 735 Returns True if the content hash for this document is unique (not shared with other documents). 736 """ 737 uri = self._format_uri_for_marklogic(judgment_uri) 738 vars: CheckContentHashUniqueByUriDict = {"uri": uri} 739 return self._eval_and_decode(vars, "check_content_hash_unique_by_uri.xqy") == "true"
Returns True if the content hash for this document is unique (not shared with other documents).
741 def eval( 742 self, 743 xquery_path: str, 744 vars: str, 745 accept_header: str = "multipart/mixed", 746 timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT), 747 ) -> requests.Response: 748 headers = { 749 "Content-type": "application/x-www-form-urlencoded", 750 "Accept": accept_header, 751 } 752 data = { 753 "xquery": Path(xquery_path).read_text(), 754 "vars": vars, 755 } 756 path = "LATEST/eval" 757 758 if DEBUG: 759 print(f"Sending {vars} to {xquery_path}") 760 761 response = self.session.request( 762 "POST", 763 url=self._path_to_request_url(path), 764 headers=headers, 765 data=data, 766 timeout=timeout, 767 ) 768 # Raise relevant exception for an erroneous response 769 self._raise_for_status(response) 770 return response
772 def invoke( 773 self, 774 module: str, 775 vars: str, 776 accept_header: str = "multipart/mixed", 777 ) -> requests.Response: 778 headers = { 779 "Content-type": "application/x-www-form-urlencoded", 780 "Accept": accept_header, 781 } 782 data = { 783 "module": module, 784 "vars": vars, 785 } 786 path = "LATEST/invoke" 787 response = self.session.request( 788 "POST", 789 url=self._path_to_request_url(path), 790 headers=headers, 791 data=data, 792 ) 793 # Raise relevant exception for an erroneous response 794 self._raise_for_status(response) 795 return response
797 def advanced_search(self, search_parameters: SearchParameters) -> requests.Response: 798 """ 799 Performs a search on the entire document set. 800 801 :param query: 802 :param court: 803 :param judge: 804 :param party: 805 :param neutral_citation: 806 :param document_name: 807 :param consignment_number: 808 :param specific_keyword: 809 :param order: 810 :param date_from: 811 :param date_to: 812 :param page: 813 :param page_size: 814 :param show_unpublished: If True, both published and unpublished documents will be returned 815 :param only_unpublished: If True, will only return published documents. Ignores the value of show_unpublished 816 :param collections: 817 :return: 818 """ 819 module = "/judgments/search/search-v2.xqy" # as stored on Marklogic 820 search_parameters.show_unpublished = self.verify_show_unpublished( 821 search_parameters.show_unpublished, 822 ) 823 vars = json.dumps(search_parameters.as_marklogic_payload()) 824 return self.invoke(module, vars)
Performs a search on the entire document set.
Parameters
- query:
- court:
- judge:
- party:
- neutral_citation:
- document_name:
- consignment_number:
- specific_keyword:
- order:
- date_from:
- date_to:
- page:
- page_size:
- show_unpublished: If True, both published and unpublished documents will be returned
- only_unpublished: If True, will only return published documents. Ignores the value of show_unpublished
- collections:
Returns
826 def eval_xslt( 827 self, 828 judgment_uri: DocumentURIString, 829 version_uri: Optional[DocumentURIString] = None, 830 show_unpublished: bool = False, 831 xsl_filename: str = DEFAULT_XSL_TRANSFORM, 832 query: Optional[str] = None, 833 ) -> requests.Response: 834 marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri) 835 marklogic_document_version_uri = ( 836 MarkLogicDocumentVersionURIString( 837 self._format_uri_for_marklogic(version_uri), 838 ) 839 if version_uri 840 else None 841 ) 842 843 image_location = os.getenv("XSLT_IMAGE_LOCATION", "") 844 845 show_unpublished = self.verify_show_unpublished(show_unpublished) 846 847 vars: query_dicts.XsltTransformDict = { 848 "uri": marklogic_document_uri, 849 "version_uri": marklogic_document_version_uri, 850 "show_unpublished": show_unpublished, 851 "img_location": image_location, 852 "xsl_filename": xsl_filename, 853 "query": query, 854 } 855 856 return self._send_to_eval(vars, "xslt_transform.xqy")
858 def accessible_judgment_transformation( 859 self, 860 judgment_uri: DocumentURIString, 861 version_uri: Optional[DocumentURIString] = None, 862 show_unpublished: bool = False, 863 ) -> requests.Response: 864 return self.eval_xslt( 865 judgment_uri, 866 version_uri, 867 show_unpublished, 868 xsl_filename=DEFAULT_XSL_TRANSFORM, 869 )
871 def original_judgment_transformation( 872 self, 873 judgment_uri: DocumentURIString, 874 version_uri: Optional[DocumentURIString] = None, 875 show_unpublished: bool = False, 876 ) -> requests.Response: 877 return self.eval_xslt( 878 judgment_uri, 879 version_uri, 880 show_unpublished, 881 xsl_filename="as-handed-down.xsl", 882 )
892 def get_property_as_node(self, judgment_uri: DocumentURIString, name: str) -> Optional[etree._Element]: 893 uri = self._format_uri_for_marklogic(judgment_uri) 894 vars: query_dicts.GetPropertyAsNodeDict = { 895 "uri": uri, 896 "name": name, 897 } 898 value = self._eval_and_decode(vars, "get_property_as_node.xqy") 899 if not value: 900 return None 901 return etree.fromstring(value)
910 def get_version_created_datetime(self, judgment_uri: DocumentURIString) -> datetime: 911 uri = self._format_uri_for_marklogic(judgment_uri) 912 vars: query_dicts.GetVersionCreatedDict = { 913 "uri": uri, 914 } 915 return datetime.strptime( 916 self._eval_and_decode(vars, "get_version_created.xqy"), 917 "%Y-%m-%dT%H:%M:%S.%f%z", 918 )
920 def set_property( 921 self, 922 judgment_uri: DocumentURIString, 923 name: str, 924 value: str, 925 ) -> requests.Response: 926 uri = self._format_uri_for_marklogic(judgment_uri) 927 vars: query_dicts.SetPropertyDict = { 928 "uri": uri, 929 "value": value, 930 "name": name, 931 } 932 933 return self._send_to_eval(vars, "set_property.xqy")
935 def set_property_as_node( 936 self, 937 judgment_uri: DocumentURIString, 938 name: str, 939 value: etree._Element, 940 ) -> requests.Response: 941 """Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded.""" 942 uri = self._format_uri_for_marklogic(judgment_uri) 943 vars: query_dicts.SetPropertyAsNodeDict = { 944 "uri": uri, 945 "value": etree.tostring(value).decode(), 946 "name": name, 947 } 948 949 return self._send_to_eval(vars, "set_property_as_node.xqy")
Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded.
951 def set_boolean_property( 952 self, 953 judgment_uri: DocumentURIString, 954 name: str, 955 value: bool, 956 ) -> requests.Response: 957 uri = self._format_uri_for_marklogic(judgment_uri) 958 string_value = "true" if value else "false" 959 vars: query_dicts.SetBooleanPropertyDict = { 960 "uri": uri, 961 "value": string_value, 962 "name": name, 963 } 964 """ 965 Set a property within MarkLogic which is specifically a boolean. 966 967 Since XML has no concept of boolean, the actual value in the database is set to `"true"` or `"false"`. 968 """ 969 return self._send_to_eval(vars, "set_boolean_property.xqy")
971 def get_boolean_property(self, judgment_uri: DocumentURIString, name: str) -> bool: 972 """ 973 Get a property from MarkLogic which is specifically a boolean. 974 975 :return: `True` if the property exists and has a value of `"true"`, otherwise `False` 976 """ 977 content = self.get_property(judgment_uri, name) 978 return content == "true"
Get a property from MarkLogic which is specifically a boolean.
Returns
Trueif the property exists and has a value of"true", otherwiseFalse
980 def set_datetime_property( 981 self, 982 judgment_uri: DocumentURIString, 983 name: str, 984 value: datetime, 985 ) -> requests.Response: 986 """Set a property within MarkLogic which is specifically a datetime.""" 987 uri = self._format_uri_for_marklogic(judgment_uri) 988 vars: query_dicts.SetDatetimePropertyDict = { 989 "uri": uri, 990 "value": value.isoformat(), 991 "name": name, 992 } 993 return self._send_to_eval(vars, "set_datetime_property.xqy")
Set a property within MarkLogic which is specifically a datetime.
995 def get_datetime_property(self, judgment_uri: DocumentURIString, name: str) -> Optional[datetime]: 996 """ 997 Get a property from MarkLogic which is specifically a datetime. 998 999 :return: A datetime with the value of the property, or `None` if it does not exist 1000 """ 1001 content = self.get_property(judgment_uri, name) 1002 1003 if content: 1004 return isoparse(content) 1005 1006 return None
Get a property from MarkLogic which is specifically a datetime.
Returns
A datetime with the value of the property, or
Noneif it does not exist
1018 def get_last_modified(self, judgment_uri: DocumentURIString) -> str: 1019 uri = self._format_uri_for_marklogic(judgment_uri) 1020 vars: query_dicts.GetLastModifiedDict = { 1021 "uri": uri, 1022 } 1023 1024 response = self._send_to_eval(vars, "get_last_modified.xqy") 1025 1026 if not response.text: 1027 return "" 1028 1029 content = str(decoder.MultipartDecoder.from_response(response).parts[0].text) 1030 return content
1037 def copy_document( 1038 self, 1039 old: DocumentURIString, 1040 new: DocumentURIString, 1041 ) -> requests.Response: 1042 old_uri = self._format_uri_for_marklogic(old) 1043 new_uri = self._format_uri_for_marklogic(new) 1044 1045 vars: query_dicts.CopyDocumentDict = { 1046 "old_uri": old_uri, 1047 "new_uri": new_uri, 1048 } 1049 return self._send_to_eval(vars, "copy_document.xqy")
1058 def user_has_privilege( 1059 self, 1060 username: str, 1061 privilege_uri: MarkLogicPrivilegeURIString, 1062 privilege_action: str, 1063 ) -> requests.Response: 1064 vars: query_dicts.UserHasPrivilegeDict = { 1065 "user": username, 1066 "privilege_uri": privilege_uri, 1067 "privilege_action": privilege_action, 1068 } 1069 return self._send_to_eval(vars, "user_has_privilege.xqy")
1071 def user_can_view_unpublished_judgments(self, username: str) -> bool: 1072 if self.user_has_admin_role(username): 1073 return True 1074 1075 check_privilege = self.user_has_privilege( 1076 username, 1077 MarkLogicPrivilegeURIString( 1078 "https://caselaw.nationalarchives.gov.uk/custom/privileges/can-view-unpublished-documents", 1079 ), 1080 "execute", 1081 ) 1082 return get_single_string_from_marklogic_response(check_privilege).lower() == "true"
1100 def calculate_seconds_until_midnight(self, now: Optional[datetime] = None) -> int: 1101 """ 1102 Get timedelta until end of day on the datetime passed, or current time. 1103 https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python 1104 """ 1105 if not now: 1106 now = datetime.now() 1107 tomorrow = now + timedelta(days=1) 1108 difference = datetime.combine(tomorrow, time.min) - now 1109 1110 return difference.seconds
Get timedelta until end of day on the datetime passed, or current time. https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python
1119 def get_properties_for_search_results( 1120 self, 1121 judgment_uris: list[DocumentURIString], 1122 ) -> str: 1123 uris = [self._format_uri_for_marklogic(judgment_uri) for judgment_uri in judgment_uris] 1124 vars: query_dicts.GetPropertiesForSearchResultsDict = {"uris": uris} 1125 response = self._send_to_eval(vars, "get_properties_for_search_results.xqy") 1126 return get_single_string_from_marklogic_response(response)
1139 def update_document_uri(self, old_uri: DocumentURIString, new_citation: NeutralCitationString) -> DocumentURIString: 1140 """ 1141 Move the document at old_uri to the correct location based on the neutral citation 1142 The new neutral citation *must* not already exist (that is handled elsewhere) 1143 This might not be needed; changing the URI/neutral citation is vanishingly rare 1144 """ 1145 return move.update_document_uri(old_uri, new_citation, api_client=self)
Move the document at old_uri to the correct location based on the neutral citation The new neutral citation must not already exist (that is handled elsewhere) This might not be needed; changing the URI/neutral citation is vanishingly rare
1147 def get_combined_stats_table(self) -> list[list[Any]]: 1148 """Run the combined statistics table xquery and return the result as a list of lists, each representing a table 1149 row.""" 1150 results: list[list[Any]] = json.loads( 1151 get_single_string_from_marklogic_response( 1152 self._send_to_eval({}, "get_combined_stats_table.xqy"), 1153 ), 1154 ) 1155 1156 return results
Run the combined statistics table xquery and return the result as a list of lists, each representing a table row.
1158 def get_highest_enrichment_version(self) -> tuple[int, int]: 1159 """This gets the highest enrichment version in the database, 1160 so if nothing has been enriched with the most recent version of enrichment, 1161 this won't reflect that change.""" 1162 table = json.loads( 1163 get_single_string_from_marklogic_response( 1164 self._send_to_eval( 1165 {}, 1166 "get_highest_enrichment_version.xqy", 1167 ), 1168 ), 1169 ) 1170 1171 return (int(table[1][1]), int(table[1][2]))
This gets the highest enrichment version in the database, so if nothing has been enriched with the most recent version of enrichment, this won't reflect that change.
1173 def get_pending_enrichment_for_version( 1174 self, 1175 target_enrichment_version: tuple[int, int], 1176 target_parser_version: tuple[int, int], 1177 maximum_records: int = 1000, 1178 ) -> list[list[Any]]: 1179 """Retrieve documents which are not yet enriched with a given version.""" 1180 vars: query_dicts.GetPendingEnrichmentForVersionDict = { 1181 "target_enrichment_major_version": target_enrichment_version[0], 1182 "target_enrichment_minor_version": target_enrichment_version[1], 1183 "target_parser_major_version": target_parser_version[0], 1184 "target_parser_minor_version": target_parser_version[1], 1185 "maximum_records": maximum_records, 1186 } 1187 results: list[list[Any]] = json.loads( 1188 get_single_string_from_marklogic_response( 1189 self._send_to_eval( 1190 vars, 1191 "get_pending_enrichment_for_version.xqy", 1192 ), 1193 ), 1194 ) 1195 1196 return results
Retrieve documents which are not yet enriched with a given version.
1198 def get_recently_enriched( 1199 self, 1200 ) -> list[list[Any]]: 1201 """Retrieve documents which are not yet enriched with a given version.""" 1202 results: list[list[Any]] = json.loads( 1203 get_single_string_from_marklogic_response( 1204 self._send_to_eval( 1205 {}, 1206 "get_recently_enriched.xqy", 1207 ), 1208 ), 1209 ) 1210 1211 return results
Retrieve documents which are not yet enriched with a given version.
1213 def get_highest_parser_version(self) -> tuple[int, int]: 1214 """This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change.""" 1215 table = json.loads( 1216 get_single_string_from_marklogic_response( 1217 self._send_to_eval( 1218 {}, 1219 "get_highest_parser_version.xqy", 1220 ), 1221 ), 1222 ) 1223 1224 return (int(table[1][1]), int(table[1][2]))
This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change.
1226 def get_documents_pending_parse_for_version( 1227 self, 1228 target_version: tuple[int, int], 1229 maximum_records: int = 1000, 1230 ) -> list[list[Any]]: 1231 """Retrieve a list of documents which are not yet parsed with a given version.""" 1232 vars: query_dicts.GetPendingParseForVersionDocumentsDict = { 1233 "target_major_version": target_version[0], 1234 "target_minor_version": target_version[1], 1235 "maximum_records": maximum_records, 1236 } 1237 results: list[list[Any]] = json.loads( 1238 get_single_string_from_marklogic_response( 1239 self._send_to_eval( 1240 vars, 1241 "get_pending_parse_for_version_documents.xqy", 1242 ), 1243 ), 1244 ) 1245 1246 return results
Retrieve a list of documents which are not yet parsed with a given version.
1248 def get_count_pending_parse_for_version( 1249 self, 1250 target_version: tuple[int, int], 1251 ) -> int: 1252 """Get the total number of documents which are not yet parsed with a given version.""" 1253 vars: query_dicts.GetPendingParseForVersionCountDict = { 1254 "target_major_version": target_version[0], 1255 "target_minor_version": target_version[1], 1256 } 1257 results = json.loads( 1258 get_single_string_from_marklogic_response( 1259 self._send_to_eval( 1260 vars, 1261 "get_pending_parse_for_version_count.xqy", 1262 ), 1263 ), 1264 ) 1265 1266 return int(results[1][0])
Get the total number of documents which are not yet parsed with a given version.
1268 def get_recently_parsed( 1269 self, 1270 ) -> list[list[Any]]: 1271 """Retrieve documents which are not yet enriched with a given version.""" 1272 results: list[list[Any]] = json.loads( 1273 get_single_string_from_marklogic_response( 1274 self._send_to_eval( 1275 {}, 1276 "get_recently_parsed.xqy", 1277 ), 1278 ), 1279 ) 1280 1281 return results
Retrieve documents which are not yet enriched with a given version.
1283 def get_locked_documents( 1284 self, 1285 ) -> list[DocumentLock]: 1286 """Retrieve all currently locked documents.""" 1287 results = [ 1288 DocumentLock.from_string(lock) 1289 for lock in get_multipart_strings_from_marklogic_response( 1290 self._send_to_eval({}, "get_locked_documents.xqy") 1291 ) 1292 ] 1293 1294 return sorted(results, key=lambda lock: lock.timestamp)
Retrieve all currently locked documents.
1296 def get_missing_fclid( 1297 self, 1298 maximum_records: int = 50, 1299 ) -> list[str]: 1300 """Retrieve the URIs of published documents which do not have an identifier in the `fclid` schema.""" 1301 vars: query_dicts.GetMissingFclidDict = { 1302 "maximum_records": maximum_records, 1303 } 1304 1305 results: list[str] = get_multipart_strings_from_marklogic_response( 1306 self._send_to_eval( 1307 vars, 1308 "get_missing_fclid.xqy", 1309 ) 1310 ) 1311 1312 return results
Retrieve the URIs of published documents which do not have an identifier in the fclid schema.
1314 def resolve_from_identifier_slug( 1315 self, identifier_slug: DocumentIdentifierSlug, published_only: bool = True 1316 ) -> IdentifierResolutions: 1317 """Given a PUI/EUI url, look up the precomputed slug and return the 1318 MarkLogic document URIs which match that slug. Multiple returns should be anticipated""" 1319 vars: query_dicts.ResolveFromIdentifierSlugDict = { 1320 "identifier_slug": identifier_slug, 1321 "published_only": int(published_only), 1322 } 1323 raw_results: list[str] = get_multipart_strings_from_marklogic_response( 1324 self._send_to_eval( 1325 vars, 1326 "resolve_from_identifier_slug.xqy", 1327 ), 1328 ) 1329 return IdentifierResolutions.from_marklogic_output(raw_results)
Given a PUI/EUI url, look up the precomputed slug and return the MarkLogic document URIs which match that slug. Multiple returns should be anticipated
1331 def resolve_from_identifier_value( 1332 self, identifier_value: DocumentIdentifierValue, published_only: bool = True 1333 ) -> IdentifierResolutions: 1334 """Given a PUI/EUI url, look up the precomputed slug and return the 1335 MarkLogic document URIs which match that slug. Multiple returns should be anticipated""" 1336 vars: query_dicts.ResolveFromIdentifierValueDict = { 1337 "identifier_value": identifier_value, 1338 "published_only": int(published_only), 1339 } 1340 raw_results: list[str] = get_multipart_strings_from_marklogic_response( 1341 self._send_to_eval( 1342 vars, 1343 "resolve_from_identifier_value.xqy", 1344 ), 1345 ) 1346 return IdentifierResolutions.from_marklogic_output(raw_results)
Given a PUI/EUI url, look up the precomputed slug and return the MarkLogic document URIs which match that slug. Multiple returns should be anticipated
1348 def get_next_document_sequence_number(self) -> int: 1349 """Increment the MarkLogic sequence number by one and return the value.""" 1350 return int(self._eval_and_decode({}, "get_next_document_sequence_number.xqy"))
Increment the MarkLogic sequence number by one and return the value.