caselawclient.Client

   1import importlib.metadata
   2import json
   3import logging
   4import os
   5import re
   6import warnings
   7from datetime import datetime, time, timedelta
   8from pathlib import Path
   9from typing import Any, Optional, Type, Union
  10from xml.etree.ElementTree import Element
  11
  12import environ
  13import requests
  14from dateutil.parser import isoparse
  15from defusedxml import ElementTree
  16from defusedxml.ElementTree import ParseError, fromstring
  17from ds_caselaw_utils.types import NeutralCitationString
  18from lxml import etree
  19from requests.auth import HTTPBasicAuth
  20from requests.structures import CaseInsensitiveDict
  21from requests_toolbelt.multipart import decoder
  22
  23from caselawclient import xquery_type_dicts as query_dicts
  24from caselawclient.identifier_resolution import IdentifierResolutions
  25from caselawclient.models.documents import (
  26    DOCUMENT_COLLECTION_URI_JUDGMENT,
  27    DOCUMENT_COLLECTION_URI_PRESS_SUMMARY,
  28    Document,
  29)
  30from caselawclient.models.documents.versions import VersionAnnotation
  31from caselawclient.models.judgments import Judgment
  32from caselawclient.models.press_summaries import PressSummary
  33from caselawclient.models.utilities import move
  34from caselawclient.search_parameters import SearchParameters
  35from caselawclient.types import DocumentIdentifierSlug, DocumentIdentifierValue, DocumentLock, DocumentURIString
  36from caselawclient.xquery_type_dicts import (
  37    CheckContentHashUniqueByUriDict,
  38    MarkLogicDocumentURIString,
  39    MarkLogicDocumentVersionURIString,
  40    MarkLogicPrivilegeURIString,
  41)
  42
  43from .content_hash import validate_content_hash
  44from .errors import (
  45    DocumentNotFoundError,
  46    GatewayTimeoutError,
  47    MarklogicAPIError,
  48    MarklogicBadRequestError,
  49    MarklogicCheckoutConflictError,
  50    MarklogicCommunicationError,
  51    MarklogicNotPermittedError,
  52    MarklogicResourceLockedError,
  53    MarklogicResourceNotCheckedOutError,
  54    MarklogicResourceNotFoundError,
  55    MarklogicResourceUnmanagedError,
  56    MarklogicUnauthorizedError,
  57    MarklogicValidationFailedError,
  58)
  59
  60env = environ.Env()
  61
  62# Requests timeouts: https://requests.readthedocs.io/en/latest/user/advanced/
  63CONNECT_TIMEOUT = float(os.environ.get("CONNECT_TIMEOUT", "3.05"))
  64READ_TIMEOUT = float(os.environ.get("READ_TIMEOUT", "10.0"))
  65
  66ROOT_DIR = os.path.dirname(os.path.realpath(__file__))
  67DEFAULT_XSL_TRANSFORM = "accessible-html.xsl"
  68
  69try:
  70    VERSION = importlib.metadata.version("ds-caselaw-marklogic-api-client")
  71except importlib.metadata.PackageNotFoundError:
  72    VERSION = "0"
  73DEFAULT_USER_AGENT = f"ds-caselaw-marklogic-api-client/{VERSION}"
  74
  75DEBUG: bool = bool(os.getenv("DEBUG", default=False))
  76
  77
  78class NoResponse(Exception):
  79    """A requests HTTPError has no response. We expect this will never happen."""
  80
  81
  82class MultipartResponseLongerThanExpected(Exception):
  83    """
  84    MarkLogic has returned a multipart response with more than one part, where only a single part was expected.
  85    """
  86
  87
  88def get_multipart_strings_from_marklogic_response(
  89    response: requests.Response,
  90) -> list[str]:
  91    """
  92    Given a HTTP response from a MarkLogic server, extract the text content from each part of the response.
  93
  94    :param response: A multipart HTTP response
  95
  96    :return: A list of the text within each part of the response
  97    """
  98    if not (response.content):
  99        return []
 100
 101    multipart_data = decoder.MultipartDecoder.from_response(response)
 102
 103    return [part.text for part in multipart_data.parts]
 104
 105
 106def get_multipart_bytes_from_marklogic_response(
 107    response: requests.Response,
 108) -> list[bytes]:
 109    if not (response.content):
 110        return []
 111
 112    multipart_data = decoder.MultipartDecoder.from_response(response)
 113
 114    return [part.content for part in multipart_data.parts]
 115
 116
 117def get_single_string_from_marklogic_response(
 118    response: requests.Response,
 119) -> str:
 120    """
 121    Given a HTTP response from a MarkLogic server, assuming the response contains a single part, extract the text
 122    content of the response.
 123
 124    :param response: A multipart HTTP response
 125
 126    :return: The text of the response
 127
 128    :raises MultipartResponseLongerThanExpected: If the response from MarkLogic has more than one part
 129    """
 130    parts = get_multipart_strings_from_marklogic_response(response)
 131    part_count = len(parts)
 132
 133    if part_count == 0:
 134        # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which
 135        # relies on "" being falsy.
 136        return ""
 137
 138    if part_count > 1:
 139        raise MultipartResponseLongerThanExpected(
 140            f"Response returned {part_count} multipart items, expected 1",
 141        )
 142
 143    return parts[0]
 144
 145
 146def get_single_bytestring_from_marklogic_response(
 147    response: requests.Response,
 148) -> bytes:
 149    parts = get_multipart_bytes_from_marklogic_response(response)
 150    part_count = len(parts)
 151
 152    if part_count == 0:
 153        # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which
 154        # relies on "" being falsy.
 155        return b""
 156
 157    if part_count > 1:
 158        raise MultipartResponseLongerThanExpected(
 159            f"Response returned {part_count} multipart items, expected 1",
 160        )
 161
 162    return parts[0]
 163
 164
 165class MarklogicApiClient:
 166    """
 167    The base class for interacting with a MarkLogic instance.
 168    """
 169
 170    http_error_classes: dict[int, Type[MarklogicAPIError]] = {
 171        400: MarklogicBadRequestError,
 172        401: MarklogicUnauthorizedError,
 173        403: MarklogicNotPermittedError,
 174        404: MarklogicResourceNotFoundError,
 175        504: GatewayTimeoutError,
 176    }
 177    error_code_classes: dict[str, Type[MarklogicAPIError]] = {
 178        "XDMP-DOCNOTFOUND": MarklogicResourceNotFoundError,
 179        "XDMP-LOCKCONFLICT": MarklogicResourceLockedError,
 180        "XDMP-LOCKED": MarklogicResourceLockedError,
 181        "DLS-UNMANAGED": MarklogicResourceUnmanagedError,
 182        "DLS-NOTCHECKEDOUT": MarklogicResourceNotCheckedOutError,
 183        "DLS-CHECKOUTCONFLICT": MarklogicCheckoutConflictError,
 184        "SEC-PRIVDNE": MarklogicNotPermittedError,
 185        "XDMP-VALIDATE.*": MarklogicValidationFailedError,
 186        "FCL-DOCUMENTNOTFOUND.*": DocumentNotFoundError,
 187    }
 188
 189    default_http_error_class = MarklogicCommunicationError
 190
 191    def __init__(
 192        self,
 193        host: str,
 194        username: str,
 195        password: str,
 196        use_https: bool,
 197        user_agent: str = DEFAULT_USER_AGENT,
 198    ) -> None:
 199        self.host = host
 200        self.username = username
 201        self.password = password
 202        self.base_url = f"{'https' if use_https else 'http'}://{self.host}:8011"
 203        # Apply auth / common headers to the session
 204        self.session = requests.Session()
 205        self.session.auth = HTTPBasicAuth(username, password)
 206        self.session.headers.update({"User-Agent": user_agent})
 207        self.user_agent = user_agent
 208
 209    def get_press_summaries_for_document_uri(
 210        self,
 211        uri: DocumentURIString,
 212    ) -> list[PressSummary]:
 213        """
 214        Returns a list of PressSummary objects associated with a given Document URI
 215        """
 216        vars: query_dicts.GetComponentsForDocumentDict = {
 217            "parent_uri": uri,
 218            "component": "pressSummary",
 219        }
 220        response = self._send_to_eval(vars, "get_components_for_document.xqy")
 221        uris = get_multipart_strings_from_marklogic_response(response)
 222        return [
 223            PressSummary(DocumentURIString(uri.strip("/").strip(".xml")), self) for uri in uris
 224        ]  # TODO: Migrate this strip behaviour into proper manipulation of a MarkLogicURIString
 225
 226    def get_document_by_uri(
 227        self,
 228        uri: DocumentURIString,
 229        search_query: Optional[str] = None,
 230    ) -> Document:
 231        document_type_class = self.get_document_type_from_uri(uri)
 232        return document_type_class(uri, self, search_query=search_query)
 233
 234    def get_document_type_from_uri(self, uri: DocumentURIString) -> Type[Document]:
 235        vars: query_dicts.DocumentCollectionsDict = {
 236            "uri": self._format_uri_for_marklogic(uri),
 237        }
 238        response = self._send_to_eval(vars, "document_collections.xqy")
 239        collections = get_multipart_strings_from_marklogic_response(response)
 240
 241        if DOCUMENT_COLLECTION_URI_JUDGMENT in collections:
 242            return Judgment
 243        if DOCUMENT_COLLECTION_URI_PRESS_SUMMARY in collections:
 244            return PressSummary
 245        return Document
 246
 247    def _get_error_code_class(self, error_code: str) -> Type[MarklogicAPIError]:
 248        """
 249        Get the exception type for a MarkLogic error code, or the first part of one
 250        """
 251        for regex, error in self.error_code_classes.items():
 252            if re.fullmatch(regex, error_code):
 253                return error
 254        print(f"No error code match found for {error_code}")
 255        return self.default_http_error_class
 256
 257    def _path_to_request_url(self, path: str) -> str:
 258        return f"{self.base_url}/{path.lstrip('/')}"
 259
 260    @classmethod
 261    def _get_error_code(cls, content_as_xml: Optional[str]) -> str:
 262        logging.warning(
 263            "XMLTools is deprecated and will be removed in later versions. "
 264            "Use methods from MarklogicApiClient.Client instead.",
 265        )
 266        if not content_as_xml:
 267            return "Unknown error, Marklogic returned a null or empty response"
 268        try:
 269            xml = fromstring(content_as_xml)
 270            return str(
 271                xml.find(
 272                    "message-code",
 273                    namespaces={"": "http://marklogic.com/xdmp/error"},
 274                ).text
 275            )
 276        except (ParseError, TypeError, AttributeError):
 277            return "Unknown error, Marklogic returned a null or empty response"
 278
 279    def _raise_for_status(self, response: requests.Response) -> None:
 280        try:
 281            response.raise_for_status()
 282        except requests.exceptions.HTTPError as e:
 283            if e.response is None:
 284                raise NoResponse
 285            status_code = e.response.status_code
 286            new_error_class = self.http_error_classes.get(
 287                status_code,
 288                self.default_http_error_class,
 289            )
 290            try:
 291                response_body = json.dumps(response.json(), indent=4)
 292            except requests.JSONDecodeError:
 293                response_body = response.text
 294
 295            if new_error_class == self.default_http_error_class:
 296                # Attempt to decode the error code from the response
 297
 298                error_code = self._get_error_code(response.content.decode("utf-8"))
 299
 300                new_error_class = self._get_error_code_class(error_code)
 301
 302            new_exception = new_error_class(
 303                f"{e}. Response body:\n{response_body}",
 304            )
 305            new_exception.response = response
 306            raise new_exception
 307
 308    def _format_uri_for_marklogic(
 309        self,
 310        uri: DocumentURIString,
 311    ) -> MarkLogicDocumentURIString:
 312        """
 313        MarkLogic requires a document URI that begins with a slash `/` and ends in `.xml`. This method ensures any takes
 314        a `DocumentURIString` and converts it to a MarkLogic-friendly `MarkLogicDocumentURIString`.
 315
 316        :return: A `MarkLogicDocumentURIString` at which the document at the given `DocumentURIString` can be located
 317            within MarkLogic.
 318        """
 319        return MarkLogicDocumentURIString(f"/{uri.lstrip('/').rstrip('/')}.xml")
 320
 321    def _xquery_path(self, xquery_file_name: str) -> str:
 322        return os.path.join(ROOT_DIR, "xquery", xquery_file_name)
 323
 324    def _send_to_eval(
 325        self,
 326        vars: query_dicts.MarkLogicAPIDict,
 327        xquery_file_name: str,
 328        timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT),
 329    ) -> requests.Response:
 330        return self.eval(
 331            self._xquery_path(xquery_file_name),
 332            vars=json.dumps(vars),
 333            accept_header="application/xml",
 334            timeout=timeout,
 335        )
 336
 337    def _eval_and_decode(
 338        self,
 339        vars: query_dicts.MarkLogicAPIDict,
 340        xquery_file_name: str,
 341    ) -> str:
 342        response = self._send_to_eval(vars, xquery_file_name)
 343        return get_single_string_from_marklogic_response(response)
 344
 345    def _eval_as_bytes(
 346        self,
 347        vars: query_dicts.MarkLogicAPIDict,
 348        xquery_file_name: str,
 349    ) -> bytes:
 350        response = self._send_to_eval(vars, xquery_file_name)
 351        return get_single_bytestring_from_marklogic_response(response)
 352
 353    def prepare_request_kwargs(
 354        self,
 355        method: str,
 356        path: str,
 357        body: Optional[str] = None,
 358        data: Optional[dict[str, Any]] = None,
 359    ) -> dict[str, Any]:
 360        kwargs = dict(url=self._path_to_request_url(path))
 361        if data is not None:
 362            data = {k: v for k, v in data.items() if v is not None}
 363            if method == "GET":
 364                kwargs["params"] = data  # type: ignore
 365            else:
 366                kwargs["data"] = json.dumps(data)
 367        if body is not None:
 368            kwargs["data"] = body
 369        return kwargs
 370
 371    def make_request(
 372        self,
 373        method: str,
 374        path: str,
 375        headers: CaseInsensitiveDict[Union[str, Any]],
 376        body: Optional[str] = None,
 377        data: Optional[dict[str, Any]] = None,
 378    ) -> requests.Response:
 379        kwargs = self.prepare_request_kwargs(method, path, body, data)
 380        self.session.headers = headers
 381        response = self.session.request(method, **kwargs)
 382        # Raise relevant exception for an erroneous response
 383        self._raise_for_status(response)
 384        return response
 385
 386    def GET(self, path: str, headers: dict[str, Any], **data: Any) -> requests.Response:
 387        logging.warning("GET() is deprecated, use eval() or invoke()")
 388        return self.make_request("GET", path, headers, data)  # type: ignore
 389
 390    def POST(
 391        self,
 392        path: str,
 393        headers: dict[str, Any],
 394        **data: Any,
 395    ) -> requests.Response:
 396        logging.warning("POST() is deprecated, use eval() or invoke()")
 397        return self.make_request("POST", path, headers, data)  # type: ignore
 398
 399    def document_exists(self, document_uri: DocumentURIString) -> bool:
 400        uri = self._format_uri_for_marklogic(document_uri)
 401        vars: query_dicts.DocumentExistsDict = {
 402            "uri": uri,
 403        }
 404        decoded_response = self._eval_and_decode(vars, "document_exists.xqy")
 405
 406        if decoded_response == "true":
 407            return True
 408        if decoded_response == "false":
 409            return False
 410        raise RuntimeError("Marklogic response was neither true nor false")
 411
 412    def get_judgment_xml_bytestring(
 413        self,
 414        judgment_uri: DocumentURIString,
 415        version_uri: Optional[DocumentURIString] = None,
 416        show_unpublished: bool = False,
 417        search_query: Optional[str] = None,
 418    ) -> bytes:
 419        marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri)
 420        marklogic_document_version_uri = (
 421            MarkLogicDocumentVersionURIString(
 422                self._format_uri_for_marklogic(version_uri),
 423            )
 424            if version_uri
 425            else None
 426        )
 427        show_unpublished = self.verify_show_unpublished(show_unpublished)
 428
 429        vars: query_dicts.GetJudgmentDict = {
 430            "uri": marklogic_document_uri,
 431            "version_uri": marklogic_document_version_uri,
 432            "show_unpublished": show_unpublished,
 433            "search_query": search_query,
 434        }
 435
 436        response = self._eval_as_bytes(vars, "get_judgment.xqy")
 437        if not response:
 438            raise MarklogicNotPermittedError(
 439                "The document is not published and show_unpublished was not set",
 440            )
 441
 442        return response
 443
 444    def get_judgment_xml(
 445        self,
 446        judgment_uri: DocumentURIString,
 447        version_uri: Optional[DocumentURIString] = None,
 448        show_unpublished: bool = False,
 449        search_query: Optional[str] = None,
 450    ) -> str:
 451        return self.get_judgment_xml_bytestring(
 452            judgment_uri,
 453            version_uri,
 454            show_unpublished,
 455            search_query=search_query,
 456        ).decode(encoding="utf-8")
 457
 458    def set_document_name(
 459        self,
 460        document_uri: DocumentURIString,
 461        content: str,
 462    ) -> requests.Response:
 463        uri = self._format_uri_for_marklogic(document_uri)
 464        vars: query_dicts.SetMetadataNameDict = {"uri": uri, "content": content}
 465        return self._send_to_eval(vars, "set_metadata_name.xqy")
 466
 467    def set_judgment_date(
 468        self,
 469        judgment_uri: DocumentURIString,
 470        content: str,
 471    ) -> requests.Response:
 472        warnings.warn(
 473            "set_judgment_date() is deprecated, use set_document_work_expression_date()",
 474            DeprecationWarning,
 475            stacklevel=2,
 476        )
 477        return self.set_document_work_expression_date(judgment_uri, content)
 478
 479    def set_document_work_expression_date(
 480        self,
 481        document_uri: DocumentURIString,
 482        content: str,
 483    ) -> requests.Response:
 484        uri = self._format_uri_for_marklogic(document_uri)
 485        vars: query_dicts.SetMetadataWorkExpressionDateDict = {
 486            "uri": uri,
 487            "content": content,
 488        }
 489
 490        return self._send_to_eval(vars, "set_metadata_work_expression_date.xqy")
 491
 492    def set_judgment_citation(
 493        self,
 494        judgment_uri: DocumentURIString,
 495        content: str,
 496    ) -> requests.Response:
 497        uri = self._format_uri_for_marklogic(judgment_uri)
 498        vars: query_dicts.SetMetadataCitationDict = {
 499            "uri": uri,
 500            "content": content.strip(),
 501        }
 502
 503        return self._send_to_eval(vars, "set_metadata_citation.xqy")
 504
 505    def set_document_court(
 506        self,
 507        document_uri: DocumentURIString,
 508        content: str,
 509    ) -> requests.Response:
 510        uri = self._format_uri_for_marklogic(document_uri)
 511        vars: query_dicts.SetMetadataCourtDict = {"uri": uri, "content": content}
 512
 513        return self._send_to_eval(vars, "set_metadata_court.xqy")
 514
 515    def set_document_jurisdiction(
 516        self,
 517        document_uri: DocumentURIString,
 518        content: str,
 519    ) -> requests.Response:
 520        uri = self._format_uri_for_marklogic(document_uri)
 521        vars: query_dicts.SetMetadataJurisdictionDict = {"uri": uri, "content": content}
 522        return self._send_to_eval(vars, "set_metadata_jurisdiction.xqy")
 523
 524    def set_document_court_and_jurisdiction(
 525        self,
 526        document_uri: DocumentURIString,
 527        content: str,
 528    ) -> requests.Response:
 529        if "/" in content:
 530            court, jurisdiction = re.split("\\s*/\\s*", content)
 531            self.set_document_court(document_uri, court)
 532            return self.set_document_jurisdiction(document_uri, jurisdiction)
 533        self.set_document_court(document_uri, content)
 534        return self.set_document_jurisdiction(document_uri, "")
 535
 536    def set_judgment_this_uri(
 537        self,
 538        judgment_uri: DocumentURIString,
 539    ) -> requests.Response:
 540        uri = self._format_uri_for_marklogic(judgment_uri)
 541        content_with_id = f"https://caselaw.nationalarchives.gov.uk/id/{judgment_uri.lstrip('/')}"
 542        content_without_id = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}"
 543        content_with_xml = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}/data.xml"
 544        vars: query_dicts.SetMetadataThisUriDict = {
 545            "uri": uri,
 546            "content_with_id": content_with_id,
 547            "content_without_id": content_without_id,
 548            "content_with_xml": content_with_xml,
 549        }
 550
 551        return self._send_to_eval(vars, "set_metadata_this_uri.xqy")
 552
 553    def save_locked_judgment_xml(
 554        self,
 555        judgment_uri: DocumentURIString,
 556        judgment_xml: bytes,
 557        annotation: VersionAnnotation,
 558    ) -> requests.Response:
 559        """assumes the judgment is already locked, does not unlock/check in
 560        note this version assumes the XML is raw bytes, rather than a tree..."""
 561
 562        validate_content_hash(judgment_xml)
 563        uri = self._format_uri_for_marklogic(judgment_uri)
 564
 565        annotation.set_calling_function("save_locked_judgment_xml")
 566        annotation.set_calling_agent(self.user_agent)
 567
 568        vars: query_dicts.UpdateLockedJudgmentDict = {
 569            "uri": uri,
 570            "judgment": judgment_xml.decode("utf-8"),
 571            "annotation": annotation.as_json,
 572        }
 573
 574        return self._send_to_eval(vars, "update_locked_judgment.xqy")
 575
 576    def insert_document_xml(
 577        self,
 578        document_uri: DocumentURIString,
 579        document_xml: Element,
 580        document_type: type[Document],
 581        annotation: VersionAnnotation,
 582    ) -> requests.Response:
 583        """
 584        Insert a new XML document into MarkLogic.
 585
 586        :param document_uri: The URI to insert the document at
 587        :param document_xml: The XML of the document to insert
 588        :param document_type: The type class of the document
 589        :param annotation: Annotations to record alongside this version
 590
 591        :return: The response object from MarkLogic
 592        """
 593        xml = ElementTree.tostring(document_xml)
 594
 595        uri = self._format_uri_for_marklogic(document_uri)
 596
 597        annotation.set_calling_function("insert_document_xml")
 598        annotation.set_calling_agent(self.user_agent)
 599
 600        vars: query_dicts.InsertDocumentDict = {
 601            "uri": uri,
 602            "type_collection": document_type.type_collection_name,
 603            "document": xml.decode("utf-8"),
 604            "annotation": annotation.as_json,
 605        }
 606
 607        return self._send_to_eval(vars, "insert_document.xqy")
 608
 609    def update_document_xml(
 610        self,
 611        document_uri: DocumentURIString,
 612        document_xml: Element,
 613        annotation: VersionAnnotation,
 614    ) -> requests.Response:
 615        """
 616        Updates an existing XML document in MarkLogic with a new version.
 617
 618        This uses `dls:document-checkout-update-checkin` to perform this in a single operation.
 619
 620        :param document_uri: The URI of the document to update
 621        :param document_xml: The new XML content of the document
 622        :param annotation: Annotations to record alongside this version
 623
 624        :return: The response object from MarkLogic
 625        """
 626        xml = ElementTree.tostring(document_xml)
 627
 628        uri = self._format_uri_for_marklogic(document_uri)
 629
 630        annotation.set_calling_function("update_document_xml")
 631        annotation.set_calling_agent(self.user_agent)
 632
 633        vars: query_dicts.UpdateDocumentDict = {
 634            "uri": uri,
 635            "judgment": xml.decode("utf-8"),
 636            "annotation": annotation.as_json,
 637        }
 638
 639        return self._send_to_eval(vars, "update_document.xqy")
 640
 641    def list_judgment_versions(
 642        self,
 643        judgment_uri: DocumentURIString,
 644    ) -> requests.Response:
 645        uri = self._format_uri_for_marklogic(judgment_uri)
 646        vars: query_dicts.ListJudgmentVersionsDict = {"uri": uri}
 647
 648        return self._send_to_eval(vars, "list_judgment_versions.xqy")
 649
 650    def checkout_judgment(
 651        self,
 652        judgment_uri: DocumentURIString,
 653        annotation: str = "",
 654        expires_at_midnight: bool = False,
 655        timeout_seconds: int = -1,
 656    ) -> requests.Response:
 657        """If timeout_seconds is -1, the lock never times out"""
 658        uri = self._format_uri_for_marklogic(judgment_uri)
 659        vars: query_dicts.CheckoutJudgmentDict = {
 660            "uri": uri,
 661            "annotation": annotation,
 662            "timeout": timeout_seconds,
 663        }
 664
 665        if expires_at_midnight:
 666            timeout = self.calculate_seconds_until_midnight()
 667            vars["timeout"] = timeout
 668
 669        return self._send_to_eval(vars, "checkout_judgment.xqy")
 670
 671    def checkin_judgment(self, judgment_uri: DocumentURIString) -> requests.Response:
 672        uri = self._format_uri_for_marklogic(judgment_uri)
 673        vars: query_dicts.CheckinJudgmentDict = {"uri": uri}
 674
 675        return self._send_to_eval(vars, "checkin_judgment.xqy")
 676
 677    def get_judgment_checkout_status(
 678        self,
 679        judgment_uri: DocumentURIString,
 680    ) -> requests.Response:
 681        uri = self._format_uri_for_marklogic(judgment_uri)
 682        vars: query_dicts.GetJudgmentCheckoutStatusDict = {"uri": uri}
 683
 684        return self._send_to_eval(vars, "get_judgment_checkout_status.xqy")
 685
 686    def get_judgment_checkout_status_message(
 687        self,
 688        judgment_uri: DocumentURIString,
 689    ) -> Optional[str]:
 690        """Return the annotation of the lock or `None` if there is no lock."""
 691        response = self.get_judgment_checkout_status(judgment_uri)
 692        if not response.content:
 693            return None
 694        content = decoder.MultipartDecoder.from_response(response).parts[0].text
 695        if content == "":
 696            return None
 697        response_xml = ElementTree.fromstring(content)
 698        return str(
 699            response_xml.find(
 700                "dls:annotation",
 701                namespaces={"dls": "http://marklogic.com/xdmp/dls"},
 702            ).text
 703        )
 704
 705    def get_judgment_version(
 706        self,
 707        judgment_uri: DocumentURIString,
 708        version: int,
 709    ) -> requests.Response:
 710        uri = self._format_uri_for_marklogic(judgment_uri)
 711        vars: query_dicts.GetJudgmentVersionDict = {"uri": uri, "version": str(version)}
 712
 713        return self._send_to_eval(vars, "get_judgment_version.xqy")
 714
 715    def validate_document(self, document_uri: DocumentURIString) -> bool:
 716        vars: query_dicts.ValidateDocumentDict = {
 717            "uri": self._format_uri_for_marklogic(document_uri),
 718        }
 719        response = self._send_to_eval(vars, "validate_document.xqy")
 720        content = decoder.MultipartDecoder.from_response(response).parts[0].text
 721        xml = ElementTree.fromstring(content)
 722        return (
 723            len(
 724                xml.findall(
 725                    ".//error:error",
 726                    {"error": "http://marklogic.com/xdmp/error"},
 727                ),
 728            )
 729            == 0
 730        )
 731
 732    def has_unique_content_hash(self, judgment_uri: DocumentURIString) -> bool:
 733        """
 734        Returns True if the content hash for this document is unique (not shared with other documents).
 735        """
 736        uri = self._format_uri_for_marklogic(judgment_uri)
 737        vars: CheckContentHashUniqueByUriDict = {"uri": uri}
 738        return self._eval_and_decode(vars, "check_content_hash_unique_by_uri.xqy") == "true"
 739
 740    def eval(
 741        self,
 742        xquery_path: str,
 743        vars: str,
 744        accept_header: str = "multipart/mixed",
 745        timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT),
 746    ) -> requests.Response:
 747        headers = {
 748            "Content-type": "application/x-www-form-urlencoded",
 749            "Accept": accept_header,
 750        }
 751        data = {
 752            "xquery": Path(xquery_path).read_text(),
 753            "vars": vars,
 754        }
 755        path = "LATEST/eval"
 756
 757        if DEBUG:
 758            print(f"Sending {vars} to {xquery_path}")
 759
 760        response = self.session.request(
 761            "POST",
 762            url=self._path_to_request_url(path),
 763            headers=headers,
 764            data=data,
 765            timeout=timeout,
 766        )
 767        # Raise relevant exception for an erroneous response
 768        self._raise_for_status(response)
 769        return response
 770
 771    def invoke(
 772        self,
 773        module: str,
 774        vars: str,
 775        accept_header: str = "multipart/mixed",
 776    ) -> requests.Response:
 777        headers = {
 778            "Content-type": "application/x-www-form-urlencoded",
 779            "Accept": accept_header,
 780        }
 781        data = {
 782            "module": module,
 783            "vars": vars,
 784        }
 785        path = "LATEST/invoke"
 786        response = self.session.request(
 787            "POST",
 788            url=self._path_to_request_url(path),
 789            headers=headers,
 790            data=data,
 791        )
 792        # Raise relevant exception for an erroneous response
 793        self._raise_for_status(response)
 794        return response
 795
 796    def advanced_search(self, search_parameters: SearchParameters) -> requests.Response:
 797        """
 798        Performs a search on the entire document set.
 799
 800        :param query:
 801        :param court:
 802        :param judge:
 803        :param party:
 804        :param neutral_citation:
 805        :param document_name:
 806        :param consignment_number:
 807        :param specific_keyword:
 808        :param order:
 809        :param date_from:
 810        :param date_to:
 811        :param page:
 812        :param page_size:
 813        :param show_unpublished: If True, both published and unpublished documents will be returned
 814        :param only_unpublished: If True, will only return published documents. Ignores the value of show_unpublished
 815        :param collections:
 816        :return:
 817        """
 818        module = "/judgments/search/search-v2.xqy"  # as stored on Marklogic
 819        search_parameters.show_unpublished = self.verify_show_unpublished(
 820            search_parameters.show_unpublished,
 821        )
 822        vars = json.dumps(search_parameters.as_marklogic_payload())
 823        return self.invoke(module, vars)
 824
 825    def eval_xslt(
 826        self,
 827        judgment_uri: DocumentURIString,
 828        version_uri: Optional[DocumentURIString] = None,
 829        show_unpublished: bool = False,
 830        xsl_filename: str = DEFAULT_XSL_TRANSFORM,
 831        query: Optional[str] = None,
 832    ) -> requests.Response:
 833        marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri)
 834        marklogic_document_version_uri = (
 835            MarkLogicDocumentVersionURIString(
 836                self._format_uri_for_marklogic(version_uri),
 837            )
 838            if version_uri
 839            else None
 840        )
 841
 842        image_location = os.getenv("XSLT_IMAGE_LOCATION", "")
 843
 844        show_unpublished = self.verify_show_unpublished(show_unpublished)
 845
 846        vars: query_dicts.XsltTransformDict = {
 847            "uri": marklogic_document_uri,
 848            "version_uri": marklogic_document_version_uri,
 849            "show_unpublished": show_unpublished,
 850            "img_location": image_location,
 851            "xsl_filename": xsl_filename,
 852            "query": query,
 853        }
 854
 855        return self._send_to_eval(vars, "xslt_transform.xqy")
 856
 857    def accessible_judgment_transformation(
 858        self,
 859        judgment_uri: DocumentURIString,
 860        version_uri: Optional[DocumentURIString] = None,
 861        show_unpublished: bool = False,
 862    ) -> requests.Response:
 863        return self.eval_xslt(
 864            judgment_uri,
 865            version_uri,
 866            show_unpublished,
 867            xsl_filename=DEFAULT_XSL_TRANSFORM,
 868        )
 869
 870    def original_judgment_transformation(
 871        self,
 872        judgment_uri: DocumentURIString,
 873        version_uri: Optional[DocumentURIString] = None,
 874        show_unpublished: bool = False,
 875    ) -> requests.Response:
 876        return self.eval_xslt(
 877            judgment_uri,
 878            version_uri,
 879            show_unpublished,
 880            xsl_filename="as-handed-down.xsl",
 881        )
 882
 883    def get_property(self, judgment_uri: DocumentURIString, name: str) -> str:
 884        uri = self._format_uri_for_marklogic(judgment_uri)
 885        vars: query_dicts.GetPropertyDict = {
 886            "uri": uri,
 887            "name": name,
 888        }
 889        return self._eval_and_decode(vars, "get_property.xqy")
 890
 891    def get_property_as_node(self, judgment_uri: DocumentURIString, name: str) -> Optional[etree._Element]:
 892        uri = self._format_uri_for_marklogic(judgment_uri)
 893        vars: query_dicts.GetPropertyAsNodeDict = {
 894            "uri": uri,
 895            "name": name,
 896        }
 897        value = self._eval_and_decode(vars, "get_property_as_node.xqy")
 898        if not value:
 899            return None
 900        return etree.fromstring(value)
 901
 902    def get_version_annotation(self, judgment_uri: DocumentURIString) -> str:
 903        uri = self._format_uri_for_marklogic(judgment_uri)
 904        vars: query_dicts.GetVersionAnnotationDict = {
 905            "uri": uri,
 906        }
 907        return self._eval_and_decode(vars, "get_version_annotation.xqy")
 908
 909    def get_version_created_datetime(self, judgment_uri: DocumentURIString) -> datetime:
 910        uri = self._format_uri_for_marklogic(judgment_uri)
 911        vars: query_dicts.GetVersionCreatedDict = {
 912            "uri": uri,
 913        }
 914        return datetime.strptime(
 915            self._eval_and_decode(vars, "get_version_created.xqy"),
 916            "%Y-%m-%dT%H:%M:%S.%f%z",
 917        )
 918
 919    def set_property(
 920        self,
 921        judgment_uri: DocumentURIString,
 922        name: str,
 923        value: str,
 924    ) -> requests.Response:
 925        uri = self._format_uri_for_marklogic(judgment_uri)
 926        vars: query_dicts.SetPropertyDict = {
 927            "uri": uri,
 928            "value": value,
 929            "name": name,
 930        }
 931
 932        return self._send_to_eval(vars, "set_property.xqy")
 933
 934    def set_property_as_node(
 935        self,
 936        judgment_uri: DocumentURIString,
 937        name: str,
 938        value: etree._Element,
 939    ) -> requests.Response:
 940        """Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded."""
 941        uri = self._format_uri_for_marklogic(judgment_uri)
 942        vars: query_dicts.SetPropertyAsNodeDict = {
 943            "uri": uri,
 944            "value": etree.tostring(value).decode(),
 945            "name": name,
 946        }
 947
 948        return self._send_to_eval(vars, "set_property_as_node.xqy")
 949
 950    def set_boolean_property(
 951        self,
 952        judgment_uri: DocumentURIString,
 953        name: str,
 954        value: bool,
 955    ) -> requests.Response:
 956        uri = self._format_uri_for_marklogic(judgment_uri)
 957        string_value = "true" if value else "false"
 958        vars: query_dicts.SetBooleanPropertyDict = {
 959            "uri": uri,
 960            "value": string_value,
 961            "name": name,
 962        }
 963        """
 964        Set a property within MarkLogic which is specifically a boolean.
 965
 966        Since XML has no concept of boolean, the actual value in the database is set to `"true"` or `"false"`.
 967        """
 968        return self._send_to_eval(vars, "set_boolean_property.xqy")
 969
 970    def get_boolean_property(self, judgment_uri: DocumentURIString, name: str) -> bool:
 971        """
 972        Get a property from MarkLogic which is specifically a boolean.
 973
 974        :return: `True` if the property exists and has a value of `"true"`, otherwise `False`
 975        """
 976        content = self.get_property(judgment_uri, name)
 977        return content == "true"
 978
 979    def set_datetime_property(
 980        self,
 981        judgment_uri: DocumentURIString,
 982        name: str,
 983        value: datetime,
 984    ) -> requests.Response:
 985        """Set a property within MarkLogic which is specifically a datetime."""
 986        uri = self._format_uri_for_marklogic(judgment_uri)
 987        vars: query_dicts.SetDatetimePropertyDict = {
 988            "uri": uri,
 989            "value": value.isoformat(),
 990            "name": name,
 991        }
 992        return self._send_to_eval(vars, "set_datetime_property.xqy")
 993
 994    def get_datetime_property(self, judgment_uri: DocumentURIString, name: str) -> Optional[datetime]:
 995        """
 996        Get a property from MarkLogic which is specifically a datetime.
 997
 998        :return: A datetime with the value of the property, or `None` if it does not exist
 999        """
1000        content = self.get_property(judgment_uri, name)
1001
1002        if content:
1003            return isoparse(content)
1004
1005        return None
1006
1007    def set_published(
1008        self,
1009        judgment_uri: DocumentURIString,
1010        published: bool,
1011    ) -> requests.Response:
1012        return self.set_boolean_property(judgment_uri, "published", published)
1013
1014    def get_published(self, judgment_uri: DocumentURIString) -> bool:
1015        return self.get_boolean_property(judgment_uri, "published")
1016
1017    def get_last_modified(self, judgment_uri: DocumentURIString) -> str:
1018        uri = self._format_uri_for_marklogic(judgment_uri)
1019        vars: query_dicts.GetLastModifiedDict = {
1020            "uri": uri,
1021        }
1022
1023        response = self._send_to_eval(vars, "get_last_modified.xqy")
1024
1025        if not response.text:
1026            return ""
1027
1028        content = str(decoder.MultipartDecoder.from_response(response).parts[0].text)
1029        return content
1030
1031    def delete_judgment(self, judgment_uri: DocumentURIString) -> requests.Response:
1032        uri = self._format_uri_for_marklogic(judgment_uri)
1033        vars: query_dicts.DeleteJudgmentDict = {"uri": uri}
1034        return self._send_to_eval(vars, "delete_judgment.xqy")
1035
1036    def copy_document(
1037        self,
1038        old: DocumentURIString,
1039        new: DocumentURIString,
1040    ) -> requests.Response:
1041        old_uri = self._format_uri_for_marklogic(old)
1042        new_uri = self._format_uri_for_marklogic(new)
1043
1044        vars: query_dicts.CopyDocumentDict = {
1045            "old_uri": old_uri,
1046            "new_uri": new_uri,
1047        }
1048        return self._send_to_eval(vars, "copy_document.xqy")
1049
1050    def break_checkout(self, judgment_uri: DocumentURIString) -> requests.Response:
1051        uri = self._format_uri_for_marklogic(judgment_uri)
1052        vars: query_dicts.BreakJudgmentCheckoutDict = {
1053            "uri": uri,
1054        }
1055        return self._send_to_eval(vars, "break_judgment_checkout.xqy")
1056
1057    def user_has_privilege(
1058        self,
1059        username: str,
1060        privilege_uri: MarkLogicPrivilegeURIString,
1061        privilege_action: str,
1062    ) -> requests.Response:
1063        vars: query_dicts.UserHasPrivilegeDict = {
1064            "user": username,
1065            "privilege_uri": privilege_uri,
1066            "privilege_action": privilege_action,
1067        }
1068        return self._send_to_eval(vars, "user_has_privilege.xqy")
1069
1070    def user_can_view_unpublished_judgments(self, username: str) -> bool:
1071        if self.user_has_admin_role(username):
1072            return True
1073
1074        check_privilege = self.user_has_privilege(
1075            username,
1076            MarkLogicPrivilegeURIString(
1077                "https://caselaw.nationalarchives.gov.uk/custom/privileges/can-view-unpublished-documents",
1078            ),
1079            "execute",
1080        )
1081        return get_single_string_from_marklogic_response(check_privilege).lower() == "true"
1082
1083    def user_has_role(self, username: str, role: str) -> requests.Response:
1084        vars: query_dicts.UserHasRoleDict = {
1085            "user": username,
1086            "role": role,
1087        }
1088        return self._send_to_eval(vars, "user_has_role.xqy")
1089
1090    def user_has_admin_role(self, username: str) -> bool:
1091        check_role = self.user_has_role(
1092            username,
1093            "admin",
1094        )
1095        multipart_data = decoder.MultipartDecoder.from_response(check_role)
1096        result = str(multipart_data.parts[0].text)
1097        return result.lower() == "true"
1098
1099    def calculate_seconds_until_midnight(self, now: Optional[datetime] = None) -> int:
1100        """
1101        Get timedelta until end of day on the datetime passed, or current time.
1102        https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python
1103        """
1104        if not now:
1105            now = datetime.now()
1106        tomorrow = now + timedelta(days=1)
1107        difference = datetime.combine(tomorrow, time.min) - now
1108
1109        return difference.seconds
1110
1111    def verify_show_unpublished(self, show_unpublished: bool) -> bool:
1112        if show_unpublished and not self.user_can_view_unpublished_judgments(
1113            self.username,
1114        ):
1115            return False
1116        return show_unpublished
1117
1118    def get_properties_for_search_results(
1119        self,
1120        judgment_uris: list[DocumentURIString],
1121    ) -> str:
1122        uris = [self._format_uri_for_marklogic(judgment_uri) for judgment_uri in judgment_uris]
1123        vars: query_dicts.GetPropertiesForSearchResultsDict = {"uris": uris}
1124        response = self._send_to_eval(vars, "get_properties_for_search_results.xqy")
1125        return get_single_string_from_marklogic_response(response)
1126
1127    def search_and_decode_response(self, search_parameters: SearchParameters) -> bytes:
1128        response = self.advanced_search(search_parameters)
1129        return get_single_bytestring_from_marklogic_response(response)
1130
1131    def search_judgments_and_decode_response(
1132        self,
1133        search_parameters: SearchParameters,
1134    ) -> bytes:
1135        search_parameters.collections = [DOCUMENT_COLLECTION_URI_JUDGMENT]
1136        return self.search_and_decode_response(search_parameters)
1137
1138    def update_document_uri(self, old_uri: DocumentURIString, new_citation: NeutralCitationString) -> DocumentURIString:
1139        """
1140        Move the document at old_uri to the correct location based on the neutral citation
1141        The new neutral citation *must* not already exist (that is handled elsewhere)
1142        This might not be needed; changing the URI/neutral citation is vanishingly rare
1143        """
1144        return move.update_document_uri(old_uri, new_citation, api_client=self)
1145
1146    def get_combined_stats_table(self) -> list[list[Any]]:
1147        """Run the combined statistics table xquery and return the result as a list of lists, each representing a table
1148        row."""
1149        results: list[list[Any]] = json.loads(
1150            get_single_string_from_marklogic_response(
1151                self._send_to_eval({}, "get_combined_stats_table.xqy"),
1152            ),
1153        )
1154
1155        return results
1156
1157    def get_highest_enrichment_version(self) -> tuple[int, int]:
1158        """This gets the highest enrichment version in the database,
1159        so if nothing has been enriched with the most recent version of enrichment,
1160        this won't reflect that change."""
1161        table = json.loads(
1162            get_single_string_from_marklogic_response(
1163                self._send_to_eval(
1164                    {},
1165                    "get_highest_enrichment_version.xqy",
1166                ),
1167            ),
1168        )
1169
1170        return (int(table[1][1]), int(table[1][2]))
1171
1172    def get_pending_enrichment_for_version(
1173        self,
1174        target_enrichment_version: tuple[int, int],
1175        target_parser_version: tuple[int, int],
1176        maximum_records: int = 1000,
1177    ) -> list[list[Any]]:
1178        """Retrieve documents which are not yet enriched with a given version."""
1179        vars: query_dicts.GetPendingEnrichmentForVersionDict = {
1180            "target_enrichment_major_version": target_enrichment_version[0],
1181            "target_enrichment_minor_version": target_enrichment_version[1],
1182            "target_parser_major_version": target_parser_version[0],
1183            "target_parser_minor_version": target_parser_version[1],
1184            "maximum_records": maximum_records,
1185        }
1186        results: list[list[Any]] = json.loads(
1187            get_single_string_from_marklogic_response(
1188                self._send_to_eval(
1189                    vars,
1190                    "get_pending_enrichment_for_version.xqy",
1191                ),
1192            ),
1193        )
1194
1195        return results
1196
1197    def get_recently_enriched(
1198        self,
1199    ) -> list[list[Any]]:
1200        """Retrieve documents which are not yet enriched with a given version."""
1201        results: list[list[Any]] = json.loads(
1202            get_single_string_from_marklogic_response(
1203                self._send_to_eval(
1204                    {},
1205                    "get_recently_enriched.xqy",
1206                ),
1207            ),
1208        )
1209
1210        return results
1211
1212    def get_highest_parser_version(self) -> tuple[int, int]:
1213        """This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change."""
1214        table = json.loads(
1215            get_single_string_from_marklogic_response(
1216                self._send_to_eval(
1217                    {},
1218                    "get_highest_parser_version.xqy",
1219                ),
1220            ),
1221        )
1222
1223        return (int(table[1][1]), int(table[1][2]))
1224
1225    def get_documents_pending_parse_for_version(
1226        self,
1227        target_version: tuple[int, int],
1228        maximum_records: int = 1000,
1229    ) -> list[list[Any]]:
1230        """Retrieve a list of documents which are not yet parsed with a given version."""
1231        vars: query_dicts.GetPendingParseForVersionDocumentsDict = {
1232            "target_major_version": target_version[0],
1233            "target_minor_version": target_version[1],
1234            "maximum_records": maximum_records,
1235        }
1236        results: list[list[Any]] = json.loads(
1237            get_single_string_from_marklogic_response(
1238                self._send_to_eval(
1239                    vars,
1240                    "get_pending_parse_for_version_documents.xqy",
1241                ),
1242            ),
1243        )
1244
1245        return results
1246
1247    def get_count_pending_parse_for_version(
1248        self,
1249        target_version: tuple[int, int],
1250    ) -> int:
1251        """Get the total number of documents which are not yet parsed with a given version."""
1252        vars: query_dicts.GetPendingParseForVersionCountDict = {
1253            "target_major_version": target_version[0],
1254            "target_minor_version": target_version[1],
1255        }
1256        results = json.loads(
1257            get_single_string_from_marklogic_response(
1258                self._send_to_eval(
1259                    vars,
1260                    "get_pending_parse_for_version_count.xqy",
1261                ),
1262            ),
1263        )
1264
1265        return int(results[1][0])
1266
1267    def get_recently_parsed(
1268        self,
1269    ) -> list[list[Any]]:
1270        """Retrieve documents which are not yet enriched with a given version."""
1271        results: list[list[Any]] = json.loads(
1272            get_single_string_from_marklogic_response(
1273                self._send_to_eval(
1274                    {},
1275                    "get_recently_parsed.xqy",
1276                ),
1277            ),
1278        )
1279
1280        return results
1281
1282    def get_locked_documents(
1283        self,
1284    ) -> list[DocumentLock]:
1285        """Retrieve all currently locked documents."""
1286        results = [
1287            DocumentLock.from_string(lock)
1288            for lock in get_multipart_strings_from_marklogic_response(
1289                self._send_to_eval({}, "get_locked_documents.xqy")
1290            )
1291        ]
1292
1293        return sorted(results, key=lambda lock: lock.timestamp)
1294
1295    def get_missing_fclid(
1296        self,
1297        maximum_records: int = 50,
1298    ) -> list[str]:
1299        """Retrieve the URIs of published documents which do not have an identifier in the `fclid` schema."""
1300        vars: query_dicts.GetMissingFclidDict = {
1301            "maximum_records": maximum_records,
1302        }
1303
1304        results: list[str] = get_multipart_strings_from_marklogic_response(
1305            self._send_to_eval(
1306                vars,
1307                "get_missing_fclid.xqy",
1308            )
1309        )
1310
1311        return results
1312
1313    def resolve_from_identifier_slug(
1314        self, identifier_slug: DocumentIdentifierSlug, published_only: bool = True
1315    ) -> IdentifierResolutions:
1316        """Given a PUI/EUI url, look up the precomputed slug and return the
1317        MarkLogic document URIs which match that slug. Multiple returns should be anticipated"""
1318        vars: query_dicts.ResolveFromIdentifierSlugDict = {
1319            "identifier_slug": identifier_slug,
1320            "published_only": int(published_only),
1321        }
1322        raw_results: list[str] = get_multipart_strings_from_marklogic_response(
1323            self._send_to_eval(
1324                vars,
1325                "resolve_from_identifier_slug.xqy",
1326            ),
1327        )
1328        return IdentifierResolutions.from_marklogic_output(raw_results)
1329
1330    def resolve_from_identifier_value(
1331        self, identifier_value: DocumentIdentifierValue, published_only: bool = True
1332    ) -> IdentifierResolutions:
1333        """Given a PUI/EUI url, look up the precomputed slug and return the
1334        MarkLogic document URIs which match that slug. Multiple returns should be anticipated"""
1335        vars: query_dicts.ResolveFromIdentifierValueDict = {
1336            "identifier_value": identifier_value,
1337            "published_only": int(published_only),
1338        }
1339        raw_results: list[str] = get_multipart_strings_from_marklogic_response(
1340            self._send_to_eval(
1341                vars,
1342                "resolve_from_identifier_value.xqy",
1343            ),
1344        )
1345        return IdentifierResolutions.from_marklogic_output(raw_results)
1346
1347    def get_next_document_sequence_number(self) -> int:
1348        """Increment the MarkLogic sequence number by one and return the value."""
1349        return int(self._eval_and_decode({}, "get_next_document_sequence_number.xqy"))
env = <environ.environ.Env object>
CONNECT_TIMEOUT = 3.05
READ_TIMEOUT = 10.0
ROOT_DIR = '/home/runner/work/ds-caselaw-custom-api-client/ds-caselaw-custom-api-client/src/caselawclient'
DEFAULT_XSL_TRANSFORM = 'accessible-html.xsl'
DEFAULT_USER_AGENT = 'ds-caselaw-marklogic-api-client/44.4.5'
DEBUG: bool = False
class NoResponse(builtins.Exception):
79class NoResponse(Exception):
80    """A requests HTTPError has no response. We expect this will never happen."""

A requests HTTPError has no response. We expect this will never happen.

class MultipartResponseLongerThanExpected(builtins.Exception):
83class MultipartResponseLongerThanExpected(Exception):
84    """
85    MarkLogic has returned a multipart response with more than one part, where only a single part was expected.
86    """

MarkLogic has returned a multipart response with more than one part, where only a single part was expected.

def get_multipart_strings_from_marklogic_response(response: requests.models.Response) -> list[str]:
 89def get_multipart_strings_from_marklogic_response(
 90    response: requests.Response,
 91) -> list[str]:
 92    """
 93    Given a HTTP response from a MarkLogic server, extract the text content from each part of the response.
 94
 95    :param response: A multipart HTTP response
 96
 97    :return: A list of the text within each part of the response
 98    """
 99    if not (response.content):
100        return []
101
102    multipart_data = decoder.MultipartDecoder.from_response(response)
103
104    return [part.text for part in multipart_data.parts]

Given a HTTP response from a MarkLogic server, extract the text content from each part of the response.

Parameters
  • response: A multipart HTTP response
Returns

A list of the text within each part of the response

def get_multipart_bytes_from_marklogic_response(response: requests.models.Response) -> list[bytes]:
107def get_multipart_bytes_from_marklogic_response(
108    response: requests.Response,
109) -> list[bytes]:
110    if not (response.content):
111        return []
112
113    multipart_data = decoder.MultipartDecoder.from_response(response)
114
115    return [part.content for part in multipart_data.parts]
def get_single_string_from_marklogic_response(response: requests.models.Response) -> str:
118def get_single_string_from_marklogic_response(
119    response: requests.Response,
120) -> str:
121    """
122    Given a HTTP response from a MarkLogic server, assuming the response contains a single part, extract the text
123    content of the response.
124
125    :param response: A multipart HTTP response
126
127    :return: The text of the response
128
129    :raises MultipartResponseLongerThanExpected: If the response from MarkLogic has more than one part
130    """
131    parts = get_multipart_strings_from_marklogic_response(response)
132    part_count = len(parts)
133
134    if part_count == 0:
135        # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which
136        # relies on "" being falsy.
137        return ""
138
139    if part_count > 1:
140        raise MultipartResponseLongerThanExpected(
141            f"Response returned {part_count} multipart items, expected 1",
142        )
143
144    return parts[0]

Given a HTTP response from a MarkLogic server, assuming the response contains a single part, extract the text content of the response.

Parameters
  • response: A multipart HTTP response
Returns

The text of the response

Raises
  • MultipartResponseLongerThanExpected: If the response from MarkLogic has more than one part
def get_single_bytestring_from_marklogic_response(response: requests.models.Response) -> bytes:
147def get_single_bytestring_from_marklogic_response(
148    response: requests.Response,
149) -> bytes:
150    parts = get_multipart_bytes_from_marklogic_response(response)
151    part_count = len(parts)
152
153    if part_count == 0:
154        # TODO: This should strictly speaking be None, but fixing this involves refactoring a lot of other stuff which
155        # relies on "" being falsy.
156        return b""
157
158    if part_count > 1:
159        raise MultipartResponseLongerThanExpected(
160            f"Response returned {part_count} multipart items, expected 1",
161        )
162
163    return parts[0]
class MarklogicApiClient:
 166class MarklogicApiClient:
 167    """
 168    The base class for interacting with a MarkLogic instance.
 169    """
 170
 171    http_error_classes: dict[int, Type[MarklogicAPIError]] = {
 172        400: MarklogicBadRequestError,
 173        401: MarklogicUnauthorizedError,
 174        403: MarklogicNotPermittedError,
 175        404: MarklogicResourceNotFoundError,
 176        504: GatewayTimeoutError,
 177    }
 178    error_code_classes: dict[str, Type[MarklogicAPIError]] = {
 179        "XDMP-DOCNOTFOUND": MarklogicResourceNotFoundError,
 180        "XDMP-LOCKCONFLICT": MarklogicResourceLockedError,
 181        "XDMP-LOCKED": MarklogicResourceLockedError,
 182        "DLS-UNMANAGED": MarklogicResourceUnmanagedError,
 183        "DLS-NOTCHECKEDOUT": MarklogicResourceNotCheckedOutError,
 184        "DLS-CHECKOUTCONFLICT": MarklogicCheckoutConflictError,
 185        "SEC-PRIVDNE": MarklogicNotPermittedError,
 186        "XDMP-VALIDATE.*": MarklogicValidationFailedError,
 187        "FCL-DOCUMENTNOTFOUND.*": DocumentNotFoundError,
 188    }
 189
 190    default_http_error_class = MarklogicCommunicationError
 191
 192    def __init__(
 193        self,
 194        host: str,
 195        username: str,
 196        password: str,
 197        use_https: bool,
 198        user_agent: str = DEFAULT_USER_AGENT,
 199    ) -> None:
 200        self.host = host
 201        self.username = username
 202        self.password = password
 203        self.base_url = f"{'https' if use_https else 'http'}://{self.host}:8011"
 204        # Apply auth / common headers to the session
 205        self.session = requests.Session()
 206        self.session.auth = HTTPBasicAuth(username, password)
 207        self.session.headers.update({"User-Agent": user_agent})
 208        self.user_agent = user_agent
 209
 210    def get_press_summaries_for_document_uri(
 211        self,
 212        uri: DocumentURIString,
 213    ) -> list[PressSummary]:
 214        """
 215        Returns a list of PressSummary objects associated with a given Document URI
 216        """
 217        vars: query_dicts.GetComponentsForDocumentDict = {
 218            "parent_uri": uri,
 219            "component": "pressSummary",
 220        }
 221        response = self._send_to_eval(vars, "get_components_for_document.xqy")
 222        uris = get_multipart_strings_from_marklogic_response(response)
 223        return [
 224            PressSummary(DocumentURIString(uri.strip("/").strip(".xml")), self) for uri in uris
 225        ]  # TODO: Migrate this strip behaviour into proper manipulation of a MarkLogicURIString
 226
 227    def get_document_by_uri(
 228        self,
 229        uri: DocumentURIString,
 230        search_query: Optional[str] = None,
 231    ) -> Document:
 232        document_type_class = self.get_document_type_from_uri(uri)
 233        return document_type_class(uri, self, search_query=search_query)
 234
 235    def get_document_type_from_uri(self, uri: DocumentURIString) -> Type[Document]:
 236        vars: query_dicts.DocumentCollectionsDict = {
 237            "uri": self._format_uri_for_marklogic(uri),
 238        }
 239        response = self._send_to_eval(vars, "document_collections.xqy")
 240        collections = get_multipart_strings_from_marklogic_response(response)
 241
 242        if DOCUMENT_COLLECTION_URI_JUDGMENT in collections:
 243            return Judgment
 244        if DOCUMENT_COLLECTION_URI_PRESS_SUMMARY in collections:
 245            return PressSummary
 246        return Document
 247
 248    def _get_error_code_class(self, error_code: str) -> Type[MarklogicAPIError]:
 249        """
 250        Get the exception type for a MarkLogic error code, or the first part of one
 251        """
 252        for regex, error in self.error_code_classes.items():
 253            if re.fullmatch(regex, error_code):
 254                return error
 255        print(f"No error code match found for {error_code}")
 256        return self.default_http_error_class
 257
 258    def _path_to_request_url(self, path: str) -> str:
 259        return f"{self.base_url}/{path.lstrip('/')}"
 260
 261    @classmethod
 262    def _get_error_code(cls, content_as_xml: Optional[str]) -> str:
 263        logging.warning(
 264            "XMLTools is deprecated and will be removed in later versions. "
 265            "Use methods from MarklogicApiClient.Client instead.",
 266        )
 267        if not content_as_xml:
 268            return "Unknown error, Marklogic returned a null or empty response"
 269        try:
 270            xml = fromstring(content_as_xml)
 271            return str(
 272                xml.find(
 273                    "message-code",
 274                    namespaces={"": "http://marklogic.com/xdmp/error"},
 275                ).text
 276            )
 277        except (ParseError, TypeError, AttributeError):
 278            return "Unknown error, Marklogic returned a null or empty response"
 279
 280    def _raise_for_status(self, response: requests.Response) -> None:
 281        try:
 282            response.raise_for_status()
 283        except requests.exceptions.HTTPError as e:
 284            if e.response is None:
 285                raise NoResponse
 286            status_code = e.response.status_code
 287            new_error_class = self.http_error_classes.get(
 288                status_code,
 289                self.default_http_error_class,
 290            )
 291            try:
 292                response_body = json.dumps(response.json(), indent=4)
 293            except requests.JSONDecodeError:
 294                response_body = response.text
 295
 296            if new_error_class == self.default_http_error_class:
 297                # Attempt to decode the error code from the response
 298
 299                error_code = self._get_error_code(response.content.decode("utf-8"))
 300
 301                new_error_class = self._get_error_code_class(error_code)
 302
 303            new_exception = new_error_class(
 304                f"{e}. Response body:\n{response_body}",
 305            )
 306            new_exception.response = response
 307            raise new_exception
 308
 309    def _format_uri_for_marklogic(
 310        self,
 311        uri: DocumentURIString,
 312    ) -> MarkLogicDocumentURIString:
 313        """
 314        MarkLogic requires a document URI that begins with a slash `/` and ends in `.xml`. This method ensures any takes
 315        a `DocumentURIString` and converts it to a MarkLogic-friendly `MarkLogicDocumentURIString`.
 316
 317        :return: A `MarkLogicDocumentURIString` at which the document at the given `DocumentURIString` can be located
 318            within MarkLogic.
 319        """
 320        return MarkLogicDocumentURIString(f"/{uri.lstrip('/').rstrip('/')}.xml")
 321
 322    def _xquery_path(self, xquery_file_name: str) -> str:
 323        return os.path.join(ROOT_DIR, "xquery", xquery_file_name)
 324
 325    def _send_to_eval(
 326        self,
 327        vars: query_dicts.MarkLogicAPIDict,
 328        xquery_file_name: str,
 329        timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT),
 330    ) -> requests.Response:
 331        return self.eval(
 332            self._xquery_path(xquery_file_name),
 333            vars=json.dumps(vars),
 334            accept_header="application/xml",
 335            timeout=timeout,
 336        )
 337
 338    def _eval_and_decode(
 339        self,
 340        vars: query_dicts.MarkLogicAPIDict,
 341        xquery_file_name: str,
 342    ) -> str:
 343        response = self._send_to_eval(vars, xquery_file_name)
 344        return get_single_string_from_marklogic_response(response)
 345
 346    def _eval_as_bytes(
 347        self,
 348        vars: query_dicts.MarkLogicAPIDict,
 349        xquery_file_name: str,
 350    ) -> bytes:
 351        response = self._send_to_eval(vars, xquery_file_name)
 352        return get_single_bytestring_from_marklogic_response(response)
 353
 354    def prepare_request_kwargs(
 355        self,
 356        method: str,
 357        path: str,
 358        body: Optional[str] = None,
 359        data: Optional[dict[str, Any]] = None,
 360    ) -> dict[str, Any]:
 361        kwargs = dict(url=self._path_to_request_url(path))
 362        if data is not None:
 363            data = {k: v for k, v in data.items() if v is not None}
 364            if method == "GET":
 365                kwargs["params"] = data  # type: ignore
 366            else:
 367                kwargs["data"] = json.dumps(data)
 368        if body is not None:
 369            kwargs["data"] = body
 370        return kwargs
 371
 372    def make_request(
 373        self,
 374        method: str,
 375        path: str,
 376        headers: CaseInsensitiveDict[Union[str, Any]],
 377        body: Optional[str] = None,
 378        data: Optional[dict[str, Any]] = None,
 379    ) -> requests.Response:
 380        kwargs = self.prepare_request_kwargs(method, path, body, data)
 381        self.session.headers = headers
 382        response = self.session.request(method, **kwargs)
 383        # Raise relevant exception for an erroneous response
 384        self._raise_for_status(response)
 385        return response
 386
 387    def GET(self, path: str, headers: dict[str, Any], **data: Any) -> requests.Response:
 388        logging.warning("GET() is deprecated, use eval() or invoke()")
 389        return self.make_request("GET", path, headers, data)  # type: ignore
 390
 391    def POST(
 392        self,
 393        path: str,
 394        headers: dict[str, Any],
 395        **data: Any,
 396    ) -> requests.Response:
 397        logging.warning("POST() is deprecated, use eval() or invoke()")
 398        return self.make_request("POST", path, headers, data)  # type: ignore
 399
 400    def document_exists(self, document_uri: DocumentURIString) -> bool:
 401        uri = self._format_uri_for_marklogic(document_uri)
 402        vars: query_dicts.DocumentExistsDict = {
 403            "uri": uri,
 404        }
 405        decoded_response = self._eval_and_decode(vars, "document_exists.xqy")
 406
 407        if decoded_response == "true":
 408            return True
 409        if decoded_response == "false":
 410            return False
 411        raise RuntimeError("Marklogic response was neither true nor false")
 412
 413    def get_judgment_xml_bytestring(
 414        self,
 415        judgment_uri: DocumentURIString,
 416        version_uri: Optional[DocumentURIString] = None,
 417        show_unpublished: bool = False,
 418        search_query: Optional[str] = None,
 419    ) -> bytes:
 420        marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri)
 421        marklogic_document_version_uri = (
 422            MarkLogicDocumentVersionURIString(
 423                self._format_uri_for_marklogic(version_uri),
 424            )
 425            if version_uri
 426            else None
 427        )
 428        show_unpublished = self.verify_show_unpublished(show_unpublished)
 429
 430        vars: query_dicts.GetJudgmentDict = {
 431            "uri": marklogic_document_uri,
 432            "version_uri": marklogic_document_version_uri,
 433            "show_unpublished": show_unpublished,
 434            "search_query": search_query,
 435        }
 436
 437        response = self._eval_as_bytes(vars, "get_judgment.xqy")
 438        if not response:
 439            raise MarklogicNotPermittedError(
 440                "The document is not published and show_unpublished was not set",
 441            )
 442
 443        return response
 444
 445    def get_judgment_xml(
 446        self,
 447        judgment_uri: DocumentURIString,
 448        version_uri: Optional[DocumentURIString] = None,
 449        show_unpublished: bool = False,
 450        search_query: Optional[str] = None,
 451    ) -> str:
 452        return self.get_judgment_xml_bytestring(
 453            judgment_uri,
 454            version_uri,
 455            show_unpublished,
 456            search_query=search_query,
 457        ).decode(encoding="utf-8")
 458
 459    def set_document_name(
 460        self,
 461        document_uri: DocumentURIString,
 462        content: str,
 463    ) -> requests.Response:
 464        uri = self._format_uri_for_marklogic(document_uri)
 465        vars: query_dicts.SetMetadataNameDict = {"uri": uri, "content": content}
 466        return self._send_to_eval(vars, "set_metadata_name.xqy")
 467
 468    def set_judgment_date(
 469        self,
 470        judgment_uri: DocumentURIString,
 471        content: str,
 472    ) -> requests.Response:
 473        warnings.warn(
 474            "set_judgment_date() is deprecated, use set_document_work_expression_date()",
 475            DeprecationWarning,
 476            stacklevel=2,
 477        )
 478        return self.set_document_work_expression_date(judgment_uri, content)
 479
 480    def set_document_work_expression_date(
 481        self,
 482        document_uri: DocumentURIString,
 483        content: str,
 484    ) -> requests.Response:
 485        uri = self._format_uri_for_marklogic(document_uri)
 486        vars: query_dicts.SetMetadataWorkExpressionDateDict = {
 487            "uri": uri,
 488            "content": content,
 489        }
 490
 491        return self._send_to_eval(vars, "set_metadata_work_expression_date.xqy")
 492
 493    def set_judgment_citation(
 494        self,
 495        judgment_uri: DocumentURIString,
 496        content: str,
 497    ) -> requests.Response:
 498        uri = self._format_uri_for_marklogic(judgment_uri)
 499        vars: query_dicts.SetMetadataCitationDict = {
 500            "uri": uri,
 501            "content": content.strip(),
 502        }
 503
 504        return self._send_to_eval(vars, "set_metadata_citation.xqy")
 505
 506    def set_document_court(
 507        self,
 508        document_uri: DocumentURIString,
 509        content: str,
 510    ) -> requests.Response:
 511        uri = self._format_uri_for_marklogic(document_uri)
 512        vars: query_dicts.SetMetadataCourtDict = {"uri": uri, "content": content}
 513
 514        return self._send_to_eval(vars, "set_metadata_court.xqy")
 515
 516    def set_document_jurisdiction(
 517        self,
 518        document_uri: DocumentURIString,
 519        content: str,
 520    ) -> requests.Response:
 521        uri = self._format_uri_for_marklogic(document_uri)
 522        vars: query_dicts.SetMetadataJurisdictionDict = {"uri": uri, "content": content}
 523        return self._send_to_eval(vars, "set_metadata_jurisdiction.xqy")
 524
 525    def set_document_court_and_jurisdiction(
 526        self,
 527        document_uri: DocumentURIString,
 528        content: str,
 529    ) -> requests.Response:
 530        if "/" in content:
 531            court, jurisdiction = re.split("\\s*/\\s*", content)
 532            self.set_document_court(document_uri, court)
 533            return self.set_document_jurisdiction(document_uri, jurisdiction)
 534        self.set_document_court(document_uri, content)
 535        return self.set_document_jurisdiction(document_uri, "")
 536
 537    def set_judgment_this_uri(
 538        self,
 539        judgment_uri: DocumentURIString,
 540    ) -> requests.Response:
 541        uri = self._format_uri_for_marklogic(judgment_uri)
 542        content_with_id = f"https://caselaw.nationalarchives.gov.uk/id/{judgment_uri.lstrip('/')}"
 543        content_without_id = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}"
 544        content_with_xml = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}/data.xml"
 545        vars: query_dicts.SetMetadataThisUriDict = {
 546            "uri": uri,
 547            "content_with_id": content_with_id,
 548            "content_without_id": content_without_id,
 549            "content_with_xml": content_with_xml,
 550        }
 551
 552        return self._send_to_eval(vars, "set_metadata_this_uri.xqy")
 553
 554    def save_locked_judgment_xml(
 555        self,
 556        judgment_uri: DocumentURIString,
 557        judgment_xml: bytes,
 558        annotation: VersionAnnotation,
 559    ) -> requests.Response:
 560        """assumes the judgment is already locked, does not unlock/check in
 561        note this version assumes the XML is raw bytes, rather than a tree..."""
 562
 563        validate_content_hash(judgment_xml)
 564        uri = self._format_uri_for_marklogic(judgment_uri)
 565
 566        annotation.set_calling_function("save_locked_judgment_xml")
 567        annotation.set_calling_agent(self.user_agent)
 568
 569        vars: query_dicts.UpdateLockedJudgmentDict = {
 570            "uri": uri,
 571            "judgment": judgment_xml.decode("utf-8"),
 572            "annotation": annotation.as_json,
 573        }
 574
 575        return self._send_to_eval(vars, "update_locked_judgment.xqy")
 576
 577    def insert_document_xml(
 578        self,
 579        document_uri: DocumentURIString,
 580        document_xml: Element,
 581        document_type: type[Document],
 582        annotation: VersionAnnotation,
 583    ) -> requests.Response:
 584        """
 585        Insert a new XML document into MarkLogic.
 586
 587        :param document_uri: The URI to insert the document at
 588        :param document_xml: The XML of the document to insert
 589        :param document_type: The type class of the document
 590        :param annotation: Annotations to record alongside this version
 591
 592        :return: The response object from MarkLogic
 593        """
 594        xml = ElementTree.tostring(document_xml)
 595
 596        uri = self._format_uri_for_marklogic(document_uri)
 597
 598        annotation.set_calling_function("insert_document_xml")
 599        annotation.set_calling_agent(self.user_agent)
 600
 601        vars: query_dicts.InsertDocumentDict = {
 602            "uri": uri,
 603            "type_collection": document_type.type_collection_name,
 604            "document": xml.decode("utf-8"),
 605            "annotation": annotation.as_json,
 606        }
 607
 608        return self._send_to_eval(vars, "insert_document.xqy")
 609
 610    def update_document_xml(
 611        self,
 612        document_uri: DocumentURIString,
 613        document_xml: Element,
 614        annotation: VersionAnnotation,
 615    ) -> requests.Response:
 616        """
 617        Updates an existing XML document in MarkLogic with a new version.
 618
 619        This uses `dls:document-checkout-update-checkin` to perform this in a single operation.
 620
 621        :param document_uri: The URI of the document to update
 622        :param document_xml: The new XML content of the document
 623        :param annotation: Annotations to record alongside this version
 624
 625        :return: The response object from MarkLogic
 626        """
 627        xml = ElementTree.tostring(document_xml)
 628
 629        uri = self._format_uri_for_marklogic(document_uri)
 630
 631        annotation.set_calling_function("update_document_xml")
 632        annotation.set_calling_agent(self.user_agent)
 633
 634        vars: query_dicts.UpdateDocumentDict = {
 635            "uri": uri,
 636            "judgment": xml.decode("utf-8"),
 637            "annotation": annotation.as_json,
 638        }
 639
 640        return self._send_to_eval(vars, "update_document.xqy")
 641
 642    def list_judgment_versions(
 643        self,
 644        judgment_uri: DocumentURIString,
 645    ) -> requests.Response:
 646        uri = self._format_uri_for_marklogic(judgment_uri)
 647        vars: query_dicts.ListJudgmentVersionsDict = {"uri": uri}
 648
 649        return self._send_to_eval(vars, "list_judgment_versions.xqy")
 650
 651    def checkout_judgment(
 652        self,
 653        judgment_uri: DocumentURIString,
 654        annotation: str = "",
 655        expires_at_midnight: bool = False,
 656        timeout_seconds: int = -1,
 657    ) -> requests.Response:
 658        """If timeout_seconds is -1, the lock never times out"""
 659        uri = self._format_uri_for_marklogic(judgment_uri)
 660        vars: query_dicts.CheckoutJudgmentDict = {
 661            "uri": uri,
 662            "annotation": annotation,
 663            "timeout": timeout_seconds,
 664        }
 665
 666        if expires_at_midnight:
 667            timeout = self.calculate_seconds_until_midnight()
 668            vars["timeout"] = timeout
 669
 670        return self._send_to_eval(vars, "checkout_judgment.xqy")
 671
 672    def checkin_judgment(self, judgment_uri: DocumentURIString) -> requests.Response:
 673        uri = self._format_uri_for_marklogic(judgment_uri)
 674        vars: query_dicts.CheckinJudgmentDict = {"uri": uri}
 675
 676        return self._send_to_eval(vars, "checkin_judgment.xqy")
 677
 678    def get_judgment_checkout_status(
 679        self,
 680        judgment_uri: DocumentURIString,
 681    ) -> requests.Response:
 682        uri = self._format_uri_for_marklogic(judgment_uri)
 683        vars: query_dicts.GetJudgmentCheckoutStatusDict = {"uri": uri}
 684
 685        return self._send_to_eval(vars, "get_judgment_checkout_status.xqy")
 686
 687    def get_judgment_checkout_status_message(
 688        self,
 689        judgment_uri: DocumentURIString,
 690    ) -> Optional[str]:
 691        """Return the annotation of the lock or `None` if there is no lock."""
 692        response = self.get_judgment_checkout_status(judgment_uri)
 693        if not response.content:
 694            return None
 695        content = decoder.MultipartDecoder.from_response(response).parts[0].text
 696        if content == "":
 697            return None
 698        response_xml = ElementTree.fromstring(content)
 699        return str(
 700            response_xml.find(
 701                "dls:annotation",
 702                namespaces={"dls": "http://marklogic.com/xdmp/dls"},
 703            ).text
 704        )
 705
 706    def get_judgment_version(
 707        self,
 708        judgment_uri: DocumentURIString,
 709        version: int,
 710    ) -> requests.Response:
 711        uri = self._format_uri_for_marklogic(judgment_uri)
 712        vars: query_dicts.GetJudgmentVersionDict = {"uri": uri, "version": str(version)}
 713
 714        return self._send_to_eval(vars, "get_judgment_version.xqy")
 715
 716    def validate_document(self, document_uri: DocumentURIString) -> bool:
 717        vars: query_dicts.ValidateDocumentDict = {
 718            "uri": self._format_uri_for_marklogic(document_uri),
 719        }
 720        response = self._send_to_eval(vars, "validate_document.xqy")
 721        content = decoder.MultipartDecoder.from_response(response).parts[0].text
 722        xml = ElementTree.fromstring(content)
 723        return (
 724            len(
 725                xml.findall(
 726                    ".//error:error",
 727                    {"error": "http://marklogic.com/xdmp/error"},
 728                ),
 729            )
 730            == 0
 731        )
 732
 733    def has_unique_content_hash(self, judgment_uri: DocumentURIString) -> bool:
 734        """
 735        Returns True if the content hash for this document is unique (not shared with other documents).
 736        """
 737        uri = self._format_uri_for_marklogic(judgment_uri)
 738        vars: CheckContentHashUniqueByUriDict = {"uri": uri}
 739        return self._eval_and_decode(vars, "check_content_hash_unique_by_uri.xqy") == "true"
 740
 741    def eval(
 742        self,
 743        xquery_path: str,
 744        vars: str,
 745        accept_header: str = "multipart/mixed",
 746        timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT),
 747    ) -> requests.Response:
 748        headers = {
 749            "Content-type": "application/x-www-form-urlencoded",
 750            "Accept": accept_header,
 751        }
 752        data = {
 753            "xquery": Path(xquery_path).read_text(),
 754            "vars": vars,
 755        }
 756        path = "LATEST/eval"
 757
 758        if DEBUG:
 759            print(f"Sending {vars} to {xquery_path}")
 760
 761        response = self.session.request(
 762            "POST",
 763            url=self._path_to_request_url(path),
 764            headers=headers,
 765            data=data,
 766            timeout=timeout,
 767        )
 768        # Raise relevant exception for an erroneous response
 769        self._raise_for_status(response)
 770        return response
 771
 772    def invoke(
 773        self,
 774        module: str,
 775        vars: str,
 776        accept_header: str = "multipart/mixed",
 777    ) -> requests.Response:
 778        headers = {
 779            "Content-type": "application/x-www-form-urlencoded",
 780            "Accept": accept_header,
 781        }
 782        data = {
 783            "module": module,
 784            "vars": vars,
 785        }
 786        path = "LATEST/invoke"
 787        response = self.session.request(
 788            "POST",
 789            url=self._path_to_request_url(path),
 790            headers=headers,
 791            data=data,
 792        )
 793        # Raise relevant exception for an erroneous response
 794        self._raise_for_status(response)
 795        return response
 796
 797    def advanced_search(self, search_parameters: SearchParameters) -> requests.Response:
 798        """
 799        Performs a search on the entire document set.
 800
 801        :param query:
 802        :param court:
 803        :param judge:
 804        :param party:
 805        :param neutral_citation:
 806        :param document_name:
 807        :param consignment_number:
 808        :param specific_keyword:
 809        :param order:
 810        :param date_from:
 811        :param date_to:
 812        :param page:
 813        :param page_size:
 814        :param show_unpublished: If True, both published and unpublished documents will be returned
 815        :param only_unpublished: If True, will only return published documents. Ignores the value of show_unpublished
 816        :param collections:
 817        :return:
 818        """
 819        module = "/judgments/search/search-v2.xqy"  # as stored on Marklogic
 820        search_parameters.show_unpublished = self.verify_show_unpublished(
 821            search_parameters.show_unpublished,
 822        )
 823        vars = json.dumps(search_parameters.as_marklogic_payload())
 824        return self.invoke(module, vars)
 825
 826    def eval_xslt(
 827        self,
 828        judgment_uri: DocumentURIString,
 829        version_uri: Optional[DocumentURIString] = None,
 830        show_unpublished: bool = False,
 831        xsl_filename: str = DEFAULT_XSL_TRANSFORM,
 832        query: Optional[str] = None,
 833    ) -> requests.Response:
 834        marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri)
 835        marklogic_document_version_uri = (
 836            MarkLogicDocumentVersionURIString(
 837                self._format_uri_for_marklogic(version_uri),
 838            )
 839            if version_uri
 840            else None
 841        )
 842
 843        image_location = os.getenv("XSLT_IMAGE_LOCATION", "")
 844
 845        show_unpublished = self.verify_show_unpublished(show_unpublished)
 846
 847        vars: query_dicts.XsltTransformDict = {
 848            "uri": marklogic_document_uri,
 849            "version_uri": marklogic_document_version_uri,
 850            "show_unpublished": show_unpublished,
 851            "img_location": image_location,
 852            "xsl_filename": xsl_filename,
 853            "query": query,
 854        }
 855
 856        return self._send_to_eval(vars, "xslt_transform.xqy")
 857
 858    def accessible_judgment_transformation(
 859        self,
 860        judgment_uri: DocumentURIString,
 861        version_uri: Optional[DocumentURIString] = None,
 862        show_unpublished: bool = False,
 863    ) -> requests.Response:
 864        return self.eval_xslt(
 865            judgment_uri,
 866            version_uri,
 867            show_unpublished,
 868            xsl_filename=DEFAULT_XSL_TRANSFORM,
 869        )
 870
 871    def original_judgment_transformation(
 872        self,
 873        judgment_uri: DocumentURIString,
 874        version_uri: Optional[DocumentURIString] = None,
 875        show_unpublished: bool = False,
 876    ) -> requests.Response:
 877        return self.eval_xslt(
 878            judgment_uri,
 879            version_uri,
 880            show_unpublished,
 881            xsl_filename="as-handed-down.xsl",
 882        )
 883
 884    def get_property(self, judgment_uri: DocumentURIString, name: str) -> str:
 885        uri = self._format_uri_for_marklogic(judgment_uri)
 886        vars: query_dicts.GetPropertyDict = {
 887            "uri": uri,
 888            "name": name,
 889        }
 890        return self._eval_and_decode(vars, "get_property.xqy")
 891
 892    def get_property_as_node(self, judgment_uri: DocumentURIString, name: str) -> Optional[etree._Element]:
 893        uri = self._format_uri_for_marklogic(judgment_uri)
 894        vars: query_dicts.GetPropertyAsNodeDict = {
 895            "uri": uri,
 896            "name": name,
 897        }
 898        value = self._eval_and_decode(vars, "get_property_as_node.xqy")
 899        if not value:
 900            return None
 901        return etree.fromstring(value)
 902
 903    def get_version_annotation(self, judgment_uri: DocumentURIString) -> str:
 904        uri = self._format_uri_for_marklogic(judgment_uri)
 905        vars: query_dicts.GetVersionAnnotationDict = {
 906            "uri": uri,
 907        }
 908        return self._eval_and_decode(vars, "get_version_annotation.xqy")
 909
 910    def get_version_created_datetime(self, judgment_uri: DocumentURIString) -> datetime:
 911        uri = self._format_uri_for_marklogic(judgment_uri)
 912        vars: query_dicts.GetVersionCreatedDict = {
 913            "uri": uri,
 914        }
 915        return datetime.strptime(
 916            self._eval_and_decode(vars, "get_version_created.xqy"),
 917            "%Y-%m-%dT%H:%M:%S.%f%z",
 918        )
 919
 920    def set_property(
 921        self,
 922        judgment_uri: DocumentURIString,
 923        name: str,
 924        value: str,
 925    ) -> requests.Response:
 926        uri = self._format_uri_for_marklogic(judgment_uri)
 927        vars: query_dicts.SetPropertyDict = {
 928            "uri": uri,
 929            "value": value,
 930            "name": name,
 931        }
 932
 933        return self._send_to_eval(vars, "set_property.xqy")
 934
 935    def set_property_as_node(
 936        self,
 937        judgment_uri: DocumentURIString,
 938        name: str,
 939        value: etree._Element,
 940    ) -> requests.Response:
 941        """Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded."""
 942        uri = self._format_uri_for_marklogic(judgment_uri)
 943        vars: query_dicts.SetPropertyAsNodeDict = {
 944            "uri": uri,
 945            "value": etree.tostring(value).decode(),
 946            "name": name,
 947        }
 948
 949        return self._send_to_eval(vars, "set_property_as_node.xqy")
 950
 951    def set_boolean_property(
 952        self,
 953        judgment_uri: DocumentURIString,
 954        name: str,
 955        value: bool,
 956    ) -> requests.Response:
 957        uri = self._format_uri_for_marklogic(judgment_uri)
 958        string_value = "true" if value else "false"
 959        vars: query_dicts.SetBooleanPropertyDict = {
 960            "uri": uri,
 961            "value": string_value,
 962            "name": name,
 963        }
 964        """
 965        Set a property within MarkLogic which is specifically a boolean.
 966
 967        Since XML has no concept of boolean, the actual value in the database is set to `"true"` or `"false"`.
 968        """
 969        return self._send_to_eval(vars, "set_boolean_property.xqy")
 970
 971    def get_boolean_property(self, judgment_uri: DocumentURIString, name: str) -> bool:
 972        """
 973        Get a property from MarkLogic which is specifically a boolean.
 974
 975        :return: `True` if the property exists and has a value of `"true"`, otherwise `False`
 976        """
 977        content = self.get_property(judgment_uri, name)
 978        return content == "true"
 979
 980    def set_datetime_property(
 981        self,
 982        judgment_uri: DocumentURIString,
 983        name: str,
 984        value: datetime,
 985    ) -> requests.Response:
 986        """Set a property within MarkLogic which is specifically a datetime."""
 987        uri = self._format_uri_for_marklogic(judgment_uri)
 988        vars: query_dicts.SetDatetimePropertyDict = {
 989            "uri": uri,
 990            "value": value.isoformat(),
 991            "name": name,
 992        }
 993        return self._send_to_eval(vars, "set_datetime_property.xqy")
 994
 995    def get_datetime_property(self, judgment_uri: DocumentURIString, name: str) -> Optional[datetime]:
 996        """
 997        Get a property from MarkLogic which is specifically a datetime.
 998
 999        :return: A datetime with the value of the property, or `None` if it does not exist
1000        """
1001        content = self.get_property(judgment_uri, name)
1002
1003        if content:
1004            return isoparse(content)
1005
1006        return None
1007
1008    def set_published(
1009        self,
1010        judgment_uri: DocumentURIString,
1011        published: bool,
1012    ) -> requests.Response:
1013        return self.set_boolean_property(judgment_uri, "published", published)
1014
1015    def get_published(self, judgment_uri: DocumentURIString) -> bool:
1016        return self.get_boolean_property(judgment_uri, "published")
1017
1018    def get_last_modified(self, judgment_uri: DocumentURIString) -> str:
1019        uri = self._format_uri_for_marklogic(judgment_uri)
1020        vars: query_dicts.GetLastModifiedDict = {
1021            "uri": uri,
1022        }
1023
1024        response = self._send_to_eval(vars, "get_last_modified.xqy")
1025
1026        if not response.text:
1027            return ""
1028
1029        content = str(decoder.MultipartDecoder.from_response(response).parts[0].text)
1030        return content
1031
1032    def delete_judgment(self, judgment_uri: DocumentURIString) -> requests.Response:
1033        uri = self._format_uri_for_marklogic(judgment_uri)
1034        vars: query_dicts.DeleteJudgmentDict = {"uri": uri}
1035        return self._send_to_eval(vars, "delete_judgment.xqy")
1036
1037    def copy_document(
1038        self,
1039        old: DocumentURIString,
1040        new: DocumentURIString,
1041    ) -> requests.Response:
1042        old_uri = self._format_uri_for_marklogic(old)
1043        new_uri = self._format_uri_for_marklogic(new)
1044
1045        vars: query_dicts.CopyDocumentDict = {
1046            "old_uri": old_uri,
1047            "new_uri": new_uri,
1048        }
1049        return self._send_to_eval(vars, "copy_document.xqy")
1050
1051    def break_checkout(self, judgment_uri: DocumentURIString) -> requests.Response:
1052        uri = self._format_uri_for_marklogic(judgment_uri)
1053        vars: query_dicts.BreakJudgmentCheckoutDict = {
1054            "uri": uri,
1055        }
1056        return self._send_to_eval(vars, "break_judgment_checkout.xqy")
1057
1058    def user_has_privilege(
1059        self,
1060        username: str,
1061        privilege_uri: MarkLogicPrivilegeURIString,
1062        privilege_action: str,
1063    ) -> requests.Response:
1064        vars: query_dicts.UserHasPrivilegeDict = {
1065            "user": username,
1066            "privilege_uri": privilege_uri,
1067            "privilege_action": privilege_action,
1068        }
1069        return self._send_to_eval(vars, "user_has_privilege.xqy")
1070
1071    def user_can_view_unpublished_judgments(self, username: str) -> bool:
1072        if self.user_has_admin_role(username):
1073            return True
1074
1075        check_privilege = self.user_has_privilege(
1076            username,
1077            MarkLogicPrivilegeURIString(
1078                "https://caselaw.nationalarchives.gov.uk/custom/privileges/can-view-unpublished-documents",
1079            ),
1080            "execute",
1081        )
1082        return get_single_string_from_marklogic_response(check_privilege).lower() == "true"
1083
1084    def user_has_role(self, username: str, role: str) -> requests.Response:
1085        vars: query_dicts.UserHasRoleDict = {
1086            "user": username,
1087            "role": role,
1088        }
1089        return self._send_to_eval(vars, "user_has_role.xqy")
1090
1091    def user_has_admin_role(self, username: str) -> bool:
1092        check_role = self.user_has_role(
1093            username,
1094            "admin",
1095        )
1096        multipart_data = decoder.MultipartDecoder.from_response(check_role)
1097        result = str(multipart_data.parts[0].text)
1098        return result.lower() == "true"
1099
1100    def calculate_seconds_until_midnight(self, now: Optional[datetime] = None) -> int:
1101        """
1102        Get timedelta until end of day on the datetime passed, or current time.
1103        https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python
1104        """
1105        if not now:
1106            now = datetime.now()
1107        tomorrow = now + timedelta(days=1)
1108        difference = datetime.combine(tomorrow, time.min) - now
1109
1110        return difference.seconds
1111
1112    def verify_show_unpublished(self, show_unpublished: bool) -> bool:
1113        if show_unpublished and not self.user_can_view_unpublished_judgments(
1114            self.username,
1115        ):
1116            return False
1117        return show_unpublished
1118
1119    def get_properties_for_search_results(
1120        self,
1121        judgment_uris: list[DocumentURIString],
1122    ) -> str:
1123        uris = [self._format_uri_for_marklogic(judgment_uri) for judgment_uri in judgment_uris]
1124        vars: query_dicts.GetPropertiesForSearchResultsDict = {"uris": uris}
1125        response = self._send_to_eval(vars, "get_properties_for_search_results.xqy")
1126        return get_single_string_from_marklogic_response(response)
1127
1128    def search_and_decode_response(self, search_parameters: SearchParameters) -> bytes:
1129        response = self.advanced_search(search_parameters)
1130        return get_single_bytestring_from_marklogic_response(response)
1131
1132    def search_judgments_and_decode_response(
1133        self,
1134        search_parameters: SearchParameters,
1135    ) -> bytes:
1136        search_parameters.collections = [DOCUMENT_COLLECTION_URI_JUDGMENT]
1137        return self.search_and_decode_response(search_parameters)
1138
1139    def update_document_uri(self, old_uri: DocumentURIString, new_citation: NeutralCitationString) -> DocumentURIString:
1140        """
1141        Move the document at old_uri to the correct location based on the neutral citation
1142        The new neutral citation *must* not already exist (that is handled elsewhere)
1143        This might not be needed; changing the URI/neutral citation is vanishingly rare
1144        """
1145        return move.update_document_uri(old_uri, new_citation, api_client=self)
1146
1147    def get_combined_stats_table(self) -> list[list[Any]]:
1148        """Run the combined statistics table xquery and return the result as a list of lists, each representing a table
1149        row."""
1150        results: list[list[Any]] = json.loads(
1151            get_single_string_from_marklogic_response(
1152                self._send_to_eval({}, "get_combined_stats_table.xqy"),
1153            ),
1154        )
1155
1156        return results
1157
1158    def get_highest_enrichment_version(self) -> tuple[int, int]:
1159        """This gets the highest enrichment version in the database,
1160        so if nothing has been enriched with the most recent version of enrichment,
1161        this won't reflect that change."""
1162        table = json.loads(
1163            get_single_string_from_marklogic_response(
1164                self._send_to_eval(
1165                    {},
1166                    "get_highest_enrichment_version.xqy",
1167                ),
1168            ),
1169        )
1170
1171        return (int(table[1][1]), int(table[1][2]))
1172
1173    def get_pending_enrichment_for_version(
1174        self,
1175        target_enrichment_version: tuple[int, int],
1176        target_parser_version: tuple[int, int],
1177        maximum_records: int = 1000,
1178    ) -> list[list[Any]]:
1179        """Retrieve documents which are not yet enriched with a given version."""
1180        vars: query_dicts.GetPendingEnrichmentForVersionDict = {
1181            "target_enrichment_major_version": target_enrichment_version[0],
1182            "target_enrichment_minor_version": target_enrichment_version[1],
1183            "target_parser_major_version": target_parser_version[0],
1184            "target_parser_minor_version": target_parser_version[1],
1185            "maximum_records": maximum_records,
1186        }
1187        results: list[list[Any]] = json.loads(
1188            get_single_string_from_marklogic_response(
1189                self._send_to_eval(
1190                    vars,
1191                    "get_pending_enrichment_for_version.xqy",
1192                ),
1193            ),
1194        )
1195
1196        return results
1197
1198    def get_recently_enriched(
1199        self,
1200    ) -> list[list[Any]]:
1201        """Retrieve documents which are not yet enriched with a given version."""
1202        results: list[list[Any]] = json.loads(
1203            get_single_string_from_marklogic_response(
1204                self._send_to_eval(
1205                    {},
1206                    "get_recently_enriched.xqy",
1207                ),
1208            ),
1209        )
1210
1211        return results
1212
1213    def get_highest_parser_version(self) -> tuple[int, int]:
1214        """This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change."""
1215        table = json.loads(
1216            get_single_string_from_marklogic_response(
1217                self._send_to_eval(
1218                    {},
1219                    "get_highest_parser_version.xqy",
1220                ),
1221            ),
1222        )
1223
1224        return (int(table[1][1]), int(table[1][2]))
1225
1226    def get_documents_pending_parse_for_version(
1227        self,
1228        target_version: tuple[int, int],
1229        maximum_records: int = 1000,
1230    ) -> list[list[Any]]:
1231        """Retrieve a list of documents which are not yet parsed with a given version."""
1232        vars: query_dicts.GetPendingParseForVersionDocumentsDict = {
1233            "target_major_version": target_version[0],
1234            "target_minor_version": target_version[1],
1235            "maximum_records": maximum_records,
1236        }
1237        results: list[list[Any]] = json.loads(
1238            get_single_string_from_marklogic_response(
1239                self._send_to_eval(
1240                    vars,
1241                    "get_pending_parse_for_version_documents.xqy",
1242                ),
1243            ),
1244        )
1245
1246        return results
1247
1248    def get_count_pending_parse_for_version(
1249        self,
1250        target_version: tuple[int, int],
1251    ) -> int:
1252        """Get the total number of documents which are not yet parsed with a given version."""
1253        vars: query_dicts.GetPendingParseForVersionCountDict = {
1254            "target_major_version": target_version[0],
1255            "target_minor_version": target_version[1],
1256        }
1257        results = json.loads(
1258            get_single_string_from_marklogic_response(
1259                self._send_to_eval(
1260                    vars,
1261                    "get_pending_parse_for_version_count.xqy",
1262                ),
1263            ),
1264        )
1265
1266        return int(results[1][0])
1267
1268    def get_recently_parsed(
1269        self,
1270    ) -> list[list[Any]]:
1271        """Retrieve documents which are not yet enriched with a given version."""
1272        results: list[list[Any]] = json.loads(
1273            get_single_string_from_marklogic_response(
1274                self._send_to_eval(
1275                    {},
1276                    "get_recently_parsed.xqy",
1277                ),
1278            ),
1279        )
1280
1281        return results
1282
1283    def get_locked_documents(
1284        self,
1285    ) -> list[DocumentLock]:
1286        """Retrieve all currently locked documents."""
1287        results = [
1288            DocumentLock.from_string(lock)
1289            for lock in get_multipart_strings_from_marklogic_response(
1290                self._send_to_eval({}, "get_locked_documents.xqy")
1291            )
1292        ]
1293
1294        return sorted(results, key=lambda lock: lock.timestamp)
1295
1296    def get_missing_fclid(
1297        self,
1298        maximum_records: int = 50,
1299    ) -> list[str]:
1300        """Retrieve the URIs of published documents which do not have an identifier in the `fclid` schema."""
1301        vars: query_dicts.GetMissingFclidDict = {
1302            "maximum_records": maximum_records,
1303        }
1304
1305        results: list[str] = get_multipart_strings_from_marklogic_response(
1306            self._send_to_eval(
1307                vars,
1308                "get_missing_fclid.xqy",
1309            )
1310        )
1311
1312        return results
1313
1314    def resolve_from_identifier_slug(
1315        self, identifier_slug: DocumentIdentifierSlug, published_only: bool = True
1316    ) -> IdentifierResolutions:
1317        """Given a PUI/EUI url, look up the precomputed slug and return the
1318        MarkLogic document URIs which match that slug. Multiple returns should be anticipated"""
1319        vars: query_dicts.ResolveFromIdentifierSlugDict = {
1320            "identifier_slug": identifier_slug,
1321            "published_only": int(published_only),
1322        }
1323        raw_results: list[str] = get_multipart_strings_from_marklogic_response(
1324            self._send_to_eval(
1325                vars,
1326                "resolve_from_identifier_slug.xqy",
1327            ),
1328        )
1329        return IdentifierResolutions.from_marklogic_output(raw_results)
1330
1331    def resolve_from_identifier_value(
1332        self, identifier_value: DocumentIdentifierValue, published_only: bool = True
1333    ) -> IdentifierResolutions:
1334        """Given a PUI/EUI url, look up the precomputed slug and return the
1335        MarkLogic document URIs which match that slug. Multiple returns should be anticipated"""
1336        vars: query_dicts.ResolveFromIdentifierValueDict = {
1337            "identifier_value": identifier_value,
1338            "published_only": int(published_only),
1339        }
1340        raw_results: list[str] = get_multipart_strings_from_marklogic_response(
1341            self._send_to_eval(
1342                vars,
1343                "resolve_from_identifier_value.xqy",
1344            ),
1345        )
1346        return IdentifierResolutions.from_marklogic_output(raw_results)
1347
1348    def get_next_document_sequence_number(self) -> int:
1349        """Increment the MarkLogic sequence number by one and return the value."""
1350        return int(self._eval_and_decode({}, "get_next_document_sequence_number.xqy"))

The base class for interacting with a MarkLogic instance.

MarklogicApiClient( host: str, username: str, password: str, use_https: bool, user_agent: str = 'ds-caselaw-marklogic-api-client/44.4.5')
192    def __init__(
193        self,
194        host: str,
195        username: str,
196        password: str,
197        use_https: bool,
198        user_agent: str = DEFAULT_USER_AGENT,
199    ) -> None:
200        self.host = host
201        self.username = username
202        self.password = password
203        self.base_url = f"{'https' if use_https else 'http'}://{self.host}:8011"
204        # Apply auth / common headers to the session
205        self.session = requests.Session()
206        self.session.auth = HTTPBasicAuth(username, password)
207        self.session.headers.update({"User-Agent": user_agent})
208        self.user_agent = user_agent
error_code_classes: dict[str, typing.Type[caselawclient.errors.MarklogicAPIError]] = {'XDMP-DOCNOTFOUND': <class 'caselawclient.errors.MarklogicResourceNotFoundError'>, 'XDMP-LOCKCONFLICT': <class 'caselawclient.errors.MarklogicResourceLockedError'>, 'XDMP-LOCKED': <class 'caselawclient.errors.MarklogicResourceLockedError'>, 'DLS-UNMANAGED': <class 'caselawclient.errors.MarklogicResourceUnmanagedError'>, 'DLS-NOTCHECKEDOUT': <class 'caselawclient.errors.MarklogicResourceNotCheckedOutError'>, 'DLS-CHECKOUTCONFLICT': <class 'caselawclient.errors.MarklogicCheckoutConflictError'>, 'SEC-PRIVDNE': <class 'caselawclient.errors.MarklogicNotPermittedError'>, 'XDMP-VALIDATE.*': <class 'caselawclient.errors.MarklogicValidationFailedError'>, 'FCL-DOCUMENTNOTFOUND.*': <class 'caselawclient.errors.DocumentNotFoundError'>}
default_http_error_class = <class 'caselawclient.errors.MarklogicCommunicationError'>
host
username
password
base_url
session
user_agent
def get_press_summaries_for_document_uri( self, uri: caselawclient.types.DocumentURIString) -> list[caselawclient.models.press_summaries.PressSummary]:
210    def get_press_summaries_for_document_uri(
211        self,
212        uri: DocumentURIString,
213    ) -> list[PressSummary]:
214        """
215        Returns a list of PressSummary objects associated with a given Document URI
216        """
217        vars: query_dicts.GetComponentsForDocumentDict = {
218            "parent_uri": uri,
219            "component": "pressSummary",
220        }
221        response = self._send_to_eval(vars, "get_components_for_document.xqy")
222        uris = get_multipart_strings_from_marklogic_response(response)
223        return [
224            PressSummary(DocumentURIString(uri.strip("/").strip(".xml")), self) for uri in uris
225        ]  # TODO: Migrate this strip behaviour into proper manipulation of a MarkLogicURIString

Returns a list of PressSummary objects associated with a given Document URI

def get_document_by_uri( self, uri: caselawclient.types.DocumentURIString, search_query: Optional[str] = None) -> caselawclient.models.documents.Document:
227    def get_document_by_uri(
228        self,
229        uri: DocumentURIString,
230        search_query: Optional[str] = None,
231    ) -> Document:
232        document_type_class = self.get_document_type_from_uri(uri)
233        return document_type_class(uri, self, search_query=search_query)
def get_document_type_from_uri( self, uri: caselawclient.types.DocumentURIString) -> Type[caselawclient.models.documents.Document]:
235    def get_document_type_from_uri(self, uri: DocumentURIString) -> Type[Document]:
236        vars: query_dicts.DocumentCollectionsDict = {
237            "uri": self._format_uri_for_marklogic(uri),
238        }
239        response = self._send_to_eval(vars, "document_collections.xqy")
240        collections = get_multipart_strings_from_marklogic_response(response)
241
242        if DOCUMENT_COLLECTION_URI_JUDGMENT in collections:
243            return Judgment
244        if DOCUMENT_COLLECTION_URI_PRESS_SUMMARY in collections:
245            return PressSummary
246        return Document
def prepare_request_kwargs( self, method: str, path: str, body: Optional[str] = None, data: Optional[dict[str, Any]] = None) -> dict[str, typing.Any]:
354    def prepare_request_kwargs(
355        self,
356        method: str,
357        path: str,
358        body: Optional[str] = None,
359        data: Optional[dict[str, Any]] = None,
360    ) -> dict[str, Any]:
361        kwargs = dict(url=self._path_to_request_url(path))
362        if data is not None:
363            data = {k: v for k, v in data.items() if v is not None}
364            if method == "GET":
365                kwargs["params"] = data  # type: ignore
366            else:
367                kwargs["data"] = json.dumps(data)
368        if body is not None:
369            kwargs["data"] = body
370        return kwargs
def make_request( self, method: str, path: str, headers: requests.structures.CaseInsensitiveDict[typing.Union[str, typing.Any]], body: Optional[str] = None, data: Optional[dict[str, Any]] = None) -> requests.models.Response:
372    def make_request(
373        self,
374        method: str,
375        path: str,
376        headers: CaseInsensitiveDict[Union[str, Any]],
377        body: Optional[str] = None,
378        data: Optional[dict[str, Any]] = None,
379    ) -> requests.Response:
380        kwargs = self.prepare_request_kwargs(method, path, body, data)
381        self.session.headers = headers
382        response = self.session.request(method, **kwargs)
383        # Raise relevant exception for an erroneous response
384        self._raise_for_status(response)
385        return response
def GET( self, path: str, headers: dict[str, typing.Any], **data: Any) -> requests.models.Response:
387    def GET(self, path: str, headers: dict[str, Any], **data: Any) -> requests.Response:
388        logging.warning("GET() is deprecated, use eval() or invoke()")
389        return self.make_request("GET", path, headers, data)  # type: ignore
def POST( self, path: str, headers: dict[str, typing.Any], **data: Any) -> requests.models.Response:
391    def POST(
392        self,
393        path: str,
394        headers: dict[str, Any],
395        **data: Any,
396    ) -> requests.Response:
397        logging.warning("POST() is deprecated, use eval() or invoke()")
398        return self.make_request("POST", path, headers, data)  # type: ignore
def document_exists(self, document_uri: caselawclient.types.DocumentURIString) -> bool:
400    def document_exists(self, document_uri: DocumentURIString) -> bool:
401        uri = self._format_uri_for_marklogic(document_uri)
402        vars: query_dicts.DocumentExistsDict = {
403            "uri": uri,
404        }
405        decoded_response = self._eval_and_decode(vars, "document_exists.xqy")
406
407        if decoded_response == "true":
408            return True
409        if decoded_response == "false":
410            return False
411        raise RuntimeError("Marklogic response was neither true nor false")
def get_judgment_xml_bytestring( self, judgment_uri: caselawclient.types.DocumentURIString, version_uri: Optional[caselawclient.types.DocumentURIString] = None, show_unpublished: bool = False, search_query: Optional[str] = None) -> bytes:
413    def get_judgment_xml_bytestring(
414        self,
415        judgment_uri: DocumentURIString,
416        version_uri: Optional[DocumentURIString] = None,
417        show_unpublished: bool = False,
418        search_query: Optional[str] = None,
419    ) -> bytes:
420        marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri)
421        marklogic_document_version_uri = (
422            MarkLogicDocumentVersionURIString(
423                self._format_uri_for_marklogic(version_uri),
424            )
425            if version_uri
426            else None
427        )
428        show_unpublished = self.verify_show_unpublished(show_unpublished)
429
430        vars: query_dicts.GetJudgmentDict = {
431            "uri": marklogic_document_uri,
432            "version_uri": marklogic_document_version_uri,
433            "show_unpublished": show_unpublished,
434            "search_query": search_query,
435        }
436
437        response = self._eval_as_bytes(vars, "get_judgment.xqy")
438        if not response:
439            raise MarklogicNotPermittedError(
440                "The document is not published and show_unpublished was not set",
441            )
442
443        return response
def get_judgment_xml( self, judgment_uri: caselawclient.types.DocumentURIString, version_uri: Optional[caselawclient.types.DocumentURIString] = None, show_unpublished: bool = False, search_query: Optional[str] = None) -> str:
445    def get_judgment_xml(
446        self,
447        judgment_uri: DocumentURIString,
448        version_uri: Optional[DocumentURIString] = None,
449        show_unpublished: bool = False,
450        search_query: Optional[str] = None,
451    ) -> str:
452        return self.get_judgment_xml_bytestring(
453            judgment_uri,
454            version_uri,
455            show_unpublished,
456            search_query=search_query,
457        ).decode(encoding="utf-8")
def set_document_name( self, document_uri: caselawclient.types.DocumentURIString, content: str) -> requests.models.Response:
459    def set_document_name(
460        self,
461        document_uri: DocumentURIString,
462        content: str,
463    ) -> requests.Response:
464        uri = self._format_uri_for_marklogic(document_uri)
465        vars: query_dicts.SetMetadataNameDict = {"uri": uri, "content": content}
466        return self._send_to_eval(vars, "set_metadata_name.xqy")
def set_judgment_date( self, judgment_uri: caselawclient.types.DocumentURIString, content: str) -> requests.models.Response:
468    def set_judgment_date(
469        self,
470        judgment_uri: DocumentURIString,
471        content: str,
472    ) -> requests.Response:
473        warnings.warn(
474            "set_judgment_date() is deprecated, use set_document_work_expression_date()",
475            DeprecationWarning,
476            stacklevel=2,
477        )
478        return self.set_document_work_expression_date(judgment_uri, content)
def set_document_work_expression_date( self, document_uri: caselawclient.types.DocumentURIString, content: str) -> requests.models.Response:
480    def set_document_work_expression_date(
481        self,
482        document_uri: DocumentURIString,
483        content: str,
484    ) -> requests.Response:
485        uri = self._format_uri_for_marklogic(document_uri)
486        vars: query_dicts.SetMetadataWorkExpressionDateDict = {
487            "uri": uri,
488            "content": content,
489        }
490
491        return self._send_to_eval(vars, "set_metadata_work_expression_date.xqy")
def set_judgment_citation( self, judgment_uri: caselawclient.types.DocumentURIString, content: str) -> requests.models.Response:
493    def set_judgment_citation(
494        self,
495        judgment_uri: DocumentURIString,
496        content: str,
497    ) -> requests.Response:
498        uri = self._format_uri_for_marklogic(judgment_uri)
499        vars: query_dicts.SetMetadataCitationDict = {
500            "uri": uri,
501            "content": content.strip(),
502        }
503
504        return self._send_to_eval(vars, "set_metadata_citation.xqy")
def set_document_court( self, document_uri: caselawclient.types.DocumentURIString, content: str) -> requests.models.Response:
506    def set_document_court(
507        self,
508        document_uri: DocumentURIString,
509        content: str,
510    ) -> requests.Response:
511        uri = self._format_uri_for_marklogic(document_uri)
512        vars: query_dicts.SetMetadataCourtDict = {"uri": uri, "content": content}
513
514        return self._send_to_eval(vars, "set_metadata_court.xqy")
def set_document_jurisdiction( self, document_uri: caselawclient.types.DocumentURIString, content: str) -> requests.models.Response:
516    def set_document_jurisdiction(
517        self,
518        document_uri: DocumentURIString,
519        content: str,
520    ) -> requests.Response:
521        uri = self._format_uri_for_marklogic(document_uri)
522        vars: query_dicts.SetMetadataJurisdictionDict = {"uri": uri, "content": content}
523        return self._send_to_eval(vars, "set_metadata_jurisdiction.xqy")
def set_document_court_and_jurisdiction( self, document_uri: caselawclient.types.DocumentURIString, content: str) -> requests.models.Response:
525    def set_document_court_and_jurisdiction(
526        self,
527        document_uri: DocumentURIString,
528        content: str,
529    ) -> requests.Response:
530        if "/" in content:
531            court, jurisdiction = re.split("\\s*/\\s*", content)
532            self.set_document_court(document_uri, court)
533            return self.set_document_jurisdiction(document_uri, jurisdiction)
534        self.set_document_court(document_uri, content)
535        return self.set_document_jurisdiction(document_uri, "")
def set_judgment_this_uri( self, judgment_uri: caselawclient.types.DocumentURIString) -> requests.models.Response:
537    def set_judgment_this_uri(
538        self,
539        judgment_uri: DocumentURIString,
540    ) -> requests.Response:
541        uri = self._format_uri_for_marklogic(judgment_uri)
542        content_with_id = f"https://caselaw.nationalarchives.gov.uk/id/{judgment_uri.lstrip('/')}"
543        content_without_id = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}"
544        content_with_xml = f"https://caselaw.nationalarchives.gov.uk/{judgment_uri.lstrip('/')}/data.xml"
545        vars: query_dicts.SetMetadataThisUriDict = {
546            "uri": uri,
547            "content_with_id": content_with_id,
548            "content_without_id": content_without_id,
549            "content_with_xml": content_with_xml,
550        }
551
552        return self._send_to_eval(vars, "set_metadata_this_uri.xqy")
def save_locked_judgment_xml( self, judgment_uri: caselawclient.types.DocumentURIString, judgment_xml: bytes, annotation: caselawclient.models.documents.versions.VersionAnnotation) -> requests.models.Response:
554    def save_locked_judgment_xml(
555        self,
556        judgment_uri: DocumentURIString,
557        judgment_xml: bytes,
558        annotation: VersionAnnotation,
559    ) -> requests.Response:
560        """assumes the judgment is already locked, does not unlock/check in
561        note this version assumes the XML is raw bytes, rather than a tree..."""
562
563        validate_content_hash(judgment_xml)
564        uri = self._format_uri_for_marklogic(judgment_uri)
565
566        annotation.set_calling_function("save_locked_judgment_xml")
567        annotation.set_calling_agent(self.user_agent)
568
569        vars: query_dicts.UpdateLockedJudgmentDict = {
570            "uri": uri,
571            "judgment": judgment_xml.decode("utf-8"),
572            "annotation": annotation.as_json,
573        }
574
575        return self._send_to_eval(vars, "update_locked_judgment.xqy")

assumes the judgment is already locked, does not unlock/check in note this version assumes the XML is raw bytes, rather than a tree...

def insert_document_xml( self, document_uri: caselawclient.types.DocumentURIString, document_xml: xml.etree.ElementTree.Element, document_type: type[caselawclient.models.documents.Document], annotation: caselawclient.models.documents.versions.VersionAnnotation) -> requests.models.Response:
577    def insert_document_xml(
578        self,
579        document_uri: DocumentURIString,
580        document_xml: Element,
581        document_type: type[Document],
582        annotation: VersionAnnotation,
583    ) -> requests.Response:
584        """
585        Insert a new XML document into MarkLogic.
586
587        :param document_uri: The URI to insert the document at
588        :param document_xml: The XML of the document to insert
589        :param document_type: The type class of the document
590        :param annotation: Annotations to record alongside this version
591
592        :return: The response object from MarkLogic
593        """
594        xml = ElementTree.tostring(document_xml)
595
596        uri = self._format_uri_for_marklogic(document_uri)
597
598        annotation.set_calling_function("insert_document_xml")
599        annotation.set_calling_agent(self.user_agent)
600
601        vars: query_dicts.InsertDocumentDict = {
602            "uri": uri,
603            "type_collection": document_type.type_collection_name,
604            "document": xml.decode("utf-8"),
605            "annotation": annotation.as_json,
606        }
607
608        return self._send_to_eval(vars, "insert_document.xqy")

Insert a new XML document into MarkLogic.

Parameters
  • document_uri: The URI to insert the document at
  • document_xml: The XML of the document to insert
  • document_type: The type class of the document
  • annotation: Annotations to record alongside this version
Returns

The response object from MarkLogic

def update_document_xml( self, document_uri: caselawclient.types.DocumentURIString, document_xml: xml.etree.ElementTree.Element, annotation: caselawclient.models.documents.versions.VersionAnnotation) -> requests.models.Response:
610    def update_document_xml(
611        self,
612        document_uri: DocumentURIString,
613        document_xml: Element,
614        annotation: VersionAnnotation,
615    ) -> requests.Response:
616        """
617        Updates an existing XML document in MarkLogic with a new version.
618
619        This uses `dls:document-checkout-update-checkin` to perform this in a single operation.
620
621        :param document_uri: The URI of the document to update
622        :param document_xml: The new XML content of the document
623        :param annotation: Annotations to record alongside this version
624
625        :return: The response object from MarkLogic
626        """
627        xml = ElementTree.tostring(document_xml)
628
629        uri = self._format_uri_for_marklogic(document_uri)
630
631        annotation.set_calling_function("update_document_xml")
632        annotation.set_calling_agent(self.user_agent)
633
634        vars: query_dicts.UpdateDocumentDict = {
635            "uri": uri,
636            "judgment": xml.decode("utf-8"),
637            "annotation": annotation.as_json,
638        }
639
640        return self._send_to_eval(vars, "update_document.xqy")

Updates an existing XML document in MarkLogic with a new version.

This uses dls:document-checkout-update-checkin to perform this in a single operation.

Parameters
  • document_uri: The URI of the document to update
  • document_xml: The new XML content of the document
  • annotation: Annotations to record alongside this version
Returns

The response object from MarkLogic

def list_judgment_versions( self, judgment_uri: caselawclient.types.DocumentURIString) -> requests.models.Response:
642    def list_judgment_versions(
643        self,
644        judgment_uri: DocumentURIString,
645    ) -> requests.Response:
646        uri = self._format_uri_for_marklogic(judgment_uri)
647        vars: query_dicts.ListJudgmentVersionsDict = {"uri": uri}
648
649        return self._send_to_eval(vars, "list_judgment_versions.xqy")
def checkout_judgment( self, judgment_uri: caselawclient.types.DocumentURIString, annotation: str = '', expires_at_midnight: bool = False, timeout_seconds: int = -1) -> requests.models.Response:
651    def checkout_judgment(
652        self,
653        judgment_uri: DocumentURIString,
654        annotation: str = "",
655        expires_at_midnight: bool = False,
656        timeout_seconds: int = -1,
657    ) -> requests.Response:
658        """If timeout_seconds is -1, the lock never times out"""
659        uri = self._format_uri_for_marklogic(judgment_uri)
660        vars: query_dicts.CheckoutJudgmentDict = {
661            "uri": uri,
662            "annotation": annotation,
663            "timeout": timeout_seconds,
664        }
665
666        if expires_at_midnight:
667            timeout = self.calculate_seconds_until_midnight()
668            vars["timeout"] = timeout
669
670        return self._send_to_eval(vars, "checkout_judgment.xqy")

If timeout_seconds is -1, the lock never times out

def checkin_judgment( self, judgment_uri: caselawclient.types.DocumentURIString) -> requests.models.Response:
672    def checkin_judgment(self, judgment_uri: DocumentURIString) -> requests.Response:
673        uri = self._format_uri_for_marklogic(judgment_uri)
674        vars: query_dicts.CheckinJudgmentDict = {"uri": uri}
675
676        return self._send_to_eval(vars, "checkin_judgment.xqy")
def get_judgment_checkout_status( self, judgment_uri: caselawclient.types.DocumentURIString) -> requests.models.Response:
678    def get_judgment_checkout_status(
679        self,
680        judgment_uri: DocumentURIString,
681    ) -> requests.Response:
682        uri = self._format_uri_for_marklogic(judgment_uri)
683        vars: query_dicts.GetJudgmentCheckoutStatusDict = {"uri": uri}
684
685        return self._send_to_eval(vars, "get_judgment_checkout_status.xqy")
def get_judgment_checkout_status_message( self, judgment_uri: caselawclient.types.DocumentURIString) -> Optional[str]:
687    def get_judgment_checkout_status_message(
688        self,
689        judgment_uri: DocumentURIString,
690    ) -> Optional[str]:
691        """Return the annotation of the lock or `None` if there is no lock."""
692        response = self.get_judgment_checkout_status(judgment_uri)
693        if not response.content:
694            return None
695        content = decoder.MultipartDecoder.from_response(response).parts[0].text
696        if content == "":
697            return None
698        response_xml = ElementTree.fromstring(content)
699        return str(
700            response_xml.find(
701                "dls:annotation",
702                namespaces={"dls": "http://marklogic.com/xdmp/dls"},
703            ).text
704        )

Return the annotation of the lock or None if there is no lock.

def get_judgment_version( self, judgment_uri: caselawclient.types.DocumentURIString, version: int) -> requests.models.Response:
706    def get_judgment_version(
707        self,
708        judgment_uri: DocumentURIString,
709        version: int,
710    ) -> requests.Response:
711        uri = self._format_uri_for_marklogic(judgment_uri)
712        vars: query_dicts.GetJudgmentVersionDict = {"uri": uri, "version": str(version)}
713
714        return self._send_to_eval(vars, "get_judgment_version.xqy")
def validate_document(self, document_uri: caselawclient.types.DocumentURIString) -> bool:
716    def validate_document(self, document_uri: DocumentURIString) -> bool:
717        vars: query_dicts.ValidateDocumentDict = {
718            "uri": self._format_uri_for_marklogic(document_uri),
719        }
720        response = self._send_to_eval(vars, "validate_document.xqy")
721        content = decoder.MultipartDecoder.from_response(response).parts[0].text
722        xml = ElementTree.fromstring(content)
723        return (
724            len(
725                xml.findall(
726                    ".//error:error",
727                    {"error": "http://marklogic.com/xdmp/error"},
728                ),
729            )
730            == 0
731        )
def has_unique_content_hash(self, judgment_uri: caselawclient.types.DocumentURIString) -> bool:
733    def has_unique_content_hash(self, judgment_uri: DocumentURIString) -> bool:
734        """
735        Returns True if the content hash for this document is unique (not shared with other documents).
736        """
737        uri = self._format_uri_for_marklogic(judgment_uri)
738        vars: CheckContentHashUniqueByUriDict = {"uri": uri}
739        return self._eval_and_decode(vars, "check_content_hash_unique_by_uri.xqy") == "true"

Returns True if the content hash for this document is unique (not shared with other documents).

def eval( self, xquery_path: str, vars: str, accept_header: str = 'multipart/mixed', timeout: tuple[float, float] = (3.05, 10.0)) -> requests.models.Response:
741    def eval(
742        self,
743        xquery_path: str,
744        vars: str,
745        accept_header: str = "multipart/mixed",
746        timeout: tuple[float, float] = (CONNECT_TIMEOUT, READ_TIMEOUT),
747    ) -> requests.Response:
748        headers = {
749            "Content-type": "application/x-www-form-urlencoded",
750            "Accept": accept_header,
751        }
752        data = {
753            "xquery": Path(xquery_path).read_text(),
754            "vars": vars,
755        }
756        path = "LATEST/eval"
757
758        if DEBUG:
759            print(f"Sending {vars} to {xquery_path}")
760
761        response = self.session.request(
762            "POST",
763            url=self._path_to_request_url(path),
764            headers=headers,
765            data=data,
766            timeout=timeout,
767        )
768        # Raise relevant exception for an erroneous response
769        self._raise_for_status(response)
770        return response
def invoke( self, module: str, vars: str, accept_header: str = 'multipart/mixed') -> requests.models.Response:
772    def invoke(
773        self,
774        module: str,
775        vars: str,
776        accept_header: str = "multipart/mixed",
777    ) -> requests.Response:
778        headers = {
779            "Content-type": "application/x-www-form-urlencoded",
780            "Accept": accept_header,
781        }
782        data = {
783            "module": module,
784            "vars": vars,
785        }
786        path = "LATEST/invoke"
787        response = self.session.request(
788            "POST",
789            url=self._path_to_request_url(path),
790            headers=headers,
791            data=data,
792        )
793        # Raise relevant exception for an erroneous response
794        self._raise_for_status(response)
795        return response
def eval_xslt( self, judgment_uri: caselawclient.types.DocumentURIString, version_uri: Optional[caselawclient.types.DocumentURIString] = None, show_unpublished: bool = False, xsl_filename: str = 'accessible-html.xsl', query: Optional[str] = None) -> requests.models.Response:
826    def eval_xslt(
827        self,
828        judgment_uri: DocumentURIString,
829        version_uri: Optional[DocumentURIString] = None,
830        show_unpublished: bool = False,
831        xsl_filename: str = DEFAULT_XSL_TRANSFORM,
832        query: Optional[str] = None,
833    ) -> requests.Response:
834        marklogic_document_uri = self._format_uri_for_marklogic(judgment_uri)
835        marklogic_document_version_uri = (
836            MarkLogicDocumentVersionURIString(
837                self._format_uri_for_marklogic(version_uri),
838            )
839            if version_uri
840            else None
841        )
842
843        image_location = os.getenv("XSLT_IMAGE_LOCATION", "")
844
845        show_unpublished = self.verify_show_unpublished(show_unpublished)
846
847        vars: query_dicts.XsltTransformDict = {
848            "uri": marklogic_document_uri,
849            "version_uri": marklogic_document_version_uri,
850            "show_unpublished": show_unpublished,
851            "img_location": image_location,
852            "xsl_filename": xsl_filename,
853            "query": query,
854        }
855
856        return self._send_to_eval(vars, "xslt_transform.xqy")
def accessible_judgment_transformation( self, judgment_uri: caselawclient.types.DocumentURIString, version_uri: Optional[caselawclient.types.DocumentURIString] = None, show_unpublished: bool = False) -> requests.models.Response:
858    def accessible_judgment_transformation(
859        self,
860        judgment_uri: DocumentURIString,
861        version_uri: Optional[DocumentURIString] = None,
862        show_unpublished: bool = False,
863    ) -> requests.Response:
864        return self.eval_xslt(
865            judgment_uri,
866            version_uri,
867            show_unpublished,
868            xsl_filename=DEFAULT_XSL_TRANSFORM,
869        )
def original_judgment_transformation( self, judgment_uri: caselawclient.types.DocumentURIString, version_uri: Optional[caselawclient.types.DocumentURIString] = None, show_unpublished: bool = False) -> requests.models.Response:
871    def original_judgment_transformation(
872        self,
873        judgment_uri: DocumentURIString,
874        version_uri: Optional[DocumentURIString] = None,
875        show_unpublished: bool = False,
876    ) -> requests.Response:
877        return self.eval_xslt(
878            judgment_uri,
879            version_uri,
880            show_unpublished,
881            xsl_filename="as-handed-down.xsl",
882        )
def get_property( self, judgment_uri: caselawclient.types.DocumentURIString, name: str) -> str:
884    def get_property(self, judgment_uri: DocumentURIString, name: str) -> str:
885        uri = self._format_uri_for_marklogic(judgment_uri)
886        vars: query_dicts.GetPropertyDict = {
887            "uri": uri,
888            "name": name,
889        }
890        return self._eval_and_decode(vars, "get_property.xqy")
def get_property_as_node( self, judgment_uri: caselawclient.types.DocumentURIString, name: str) -> Optional[lxml.etree._Element]:
892    def get_property_as_node(self, judgment_uri: DocumentURIString, name: str) -> Optional[etree._Element]:
893        uri = self._format_uri_for_marklogic(judgment_uri)
894        vars: query_dicts.GetPropertyAsNodeDict = {
895            "uri": uri,
896            "name": name,
897        }
898        value = self._eval_and_decode(vars, "get_property_as_node.xqy")
899        if not value:
900            return None
901        return etree.fromstring(value)
def get_version_annotation(self, judgment_uri: caselawclient.types.DocumentURIString) -> str:
903    def get_version_annotation(self, judgment_uri: DocumentURIString) -> str:
904        uri = self._format_uri_for_marklogic(judgment_uri)
905        vars: query_dicts.GetVersionAnnotationDict = {
906            "uri": uri,
907        }
908        return self._eval_and_decode(vars, "get_version_annotation.xqy")
def get_version_created_datetime( self, judgment_uri: caselawclient.types.DocumentURIString) -> datetime.datetime:
910    def get_version_created_datetime(self, judgment_uri: DocumentURIString) -> datetime:
911        uri = self._format_uri_for_marklogic(judgment_uri)
912        vars: query_dicts.GetVersionCreatedDict = {
913            "uri": uri,
914        }
915        return datetime.strptime(
916            self._eval_and_decode(vars, "get_version_created.xqy"),
917            "%Y-%m-%dT%H:%M:%S.%f%z",
918        )
def set_property( self, judgment_uri: caselawclient.types.DocumentURIString, name: str, value: str) -> requests.models.Response:
920    def set_property(
921        self,
922        judgment_uri: DocumentURIString,
923        name: str,
924        value: str,
925    ) -> requests.Response:
926        uri = self._format_uri_for_marklogic(judgment_uri)
927        vars: query_dicts.SetPropertyDict = {
928            "uri": uri,
929            "value": value,
930            "name": name,
931        }
932
933        return self._send_to_eval(vars, "set_property.xqy")
def set_property_as_node( self, judgment_uri: caselawclient.types.DocumentURIString, name: str, value: lxml.etree._Element) -> requests.models.Response:
935    def set_property_as_node(
936        self,
937        judgment_uri: DocumentURIString,
938        name: str,
939        value: etree._Element,
940    ) -> requests.Response:
941        """Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded."""
942        uri = self._format_uri_for_marklogic(judgment_uri)
943        vars: query_dicts.SetPropertyAsNodeDict = {
944            "uri": uri,
945            "value": etree.tostring(value).decode(),
946            "name": name,
947        }
948
949        return self._send_to_eval(vars, "set_property_as_node.xqy")

Given a root node, set the value of the MarkLogic property for a document to the _contents_ of that root node. The root node itself is discarded.

def set_boolean_property( self, judgment_uri: caselawclient.types.DocumentURIString, name: str, value: bool) -> requests.models.Response:
951    def set_boolean_property(
952        self,
953        judgment_uri: DocumentURIString,
954        name: str,
955        value: bool,
956    ) -> requests.Response:
957        uri = self._format_uri_for_marklogic(judgment_uri)
958        string_value = "true" if value else "false"
959        vars: query_dicts.SetBooleanPropertyDict = {
960            "uri": uri,
961            "value": string_value,
962            "name": name,
963        }
964        """
965        Set a property within MarkLogic which is specifically a boolean.
966
967        Since XML has no concept of boolean, the actual value in the database is set to `"true"` or `"false"`.
968        """
969        return self._send_to_eval(vars, "set_boolean_property.xqy")
def get_boolean_property( self, judgment_uri: caselawclient.types.DocumentURIString, name: str) -> bool:
971    def get_boolean_property(self, judgment_uri: DocumentURIString, name: str) -> bool:
972        """
973        Get a property from MarkLogic which is specifically a boolean.
974
975        :return: `True` if the property exists and has a value of `"true"`, otherwise `False`
976        """
977        content = self.get_property(judgment_uri, name)
978        return content == "true"

Get a property from MarkLogic which is specifically a boolean.

Returns

True if the property exists and has a value of "true", otherwise False

def set_datetime_property( self, judgment_uri: caselawclient.types.DocumentURIString, name: str, value: datetime.datetime) -> requests.models.Response:
980    def set_datetime_property(
981        self,
982        judgment_uri: DocumentURIString,
983        name: str,
984        value: datetime,
985    ) -> requests.Response:
986        """Set a property within MarkLogic which is specifically a datetime."""
987        uri = self._format_uri_for_marklogic(judgment_uri)
988        vars: query_dicts.SetDatetimePropertyDict = {
989            "uri": uri,
990            "value": value.isoformat(),
991            "name": name,
992        }
993        return self._send_to_eval(vars, "set_datetime_property.xqy")

Set a property within MarkLogic which is specifically a datetime.

def get_datetime_property( self, judgment_uri: caselawclient.types.DocumentURIString, name: str) -> Optional[datetime.datetime]:
 995    def get_datetime_property(self, judgment_uri: DocumentURIString, name: str) -> Optional[datetime]:
 996        """
 997        Get a property from MarkLogic which is specifically a datetime.
 998
 999        :return: A datetime with the value of the property, or `None` if it does not exist
1000        """
1001        content = self.get_property(judgment_uri, name)
1002
1003        if content:
1004            return isoparse(content)
1005
1006        return None

Get a property from MarkLogic which is specifically a datetime.

Returns

A datetime with the value of the property, or None if it does not exist

def set_published( self, judgment_uri: caselawclient.types.DocumentURIString, published: bool) -> requests.models.Response:
1008    def set_published(
1009        self,
1010        judgment_uri: DocumentURIString,
1011        published: bool,
1012    ) -> requests.Response:
1013        return self.set_boolean_property(judgment_uri, "published", published)
def get_published(self, judgment_uri: caselawclient.types.DocumentURIString) -> bool:
1015    def get_published(self, judgment_uri: DocumentURIString) -> bool:
1016        return self.get_boolean_property(judgment_uri, "published")
def get_last_modified(self, judgment_uri: caselawclient.types.DocumentURIString) -> str:
1018    def get_last_modified(self, judgment_uri: DocumentURIString) -> str:
1019        uri = self._format_uri_for_marklogic(judgment_uri)
1020        vars: query_dicts.GetLastModifiedDict = {
1021            "uri": uri,
1022        }
1023
1024        response = self._send_to_eval(vars, "get_last_modified.xqy")
1025
1026        if not response.text:
1027            return ""
1028
1029        content = str(decoder.MultipartDecoder.from_response(response).parts[0].text)
1030        return content
def delete_judgment( self, judgment_uri: caselawclient.types.DocumentURIString) -> requests.models.Response:
1032    def delete_judgment(self, judgment_uri: DocumentURIString) -> requests.Response:
1033        uri = self._format_uri_for_marklogic(judgment_uri)
1034        vars: query_dicts.DeleteJudgmentDict = {"uri": uri}
1035        return self._send_to_eval(vars, "delete_judgment.xqy")
def copy_document( self, old: caselawclient.types.DocumentURIString, new: caselawclient.types.DocumentURIString) -> requests.models.Response:
1037    def copy_document(
1038        self,
1039        old: DocumentURIString,
1040        new: DocumentURIString,
1041    ) -> requests.Response:
1042        old_uri = self._format_uri_for_marklogic(old)
1043        new_uri = self._format_uri_for_marklogic(new)
1044
1045        vars: query_dicts.CopyDocumentDict = {
1046            "old_uri": old_uri,
1047            "new_uri": new_uri,
1048        }
1049        return self._send_to_eval(vars, "copy_document.xqy")
def break_checkout( self, judgment_uri: caselawclient.types.DocumentURIString) -> requests.models.Response:
1051    def break_checkout(self, judgment_uri: DocumentURIString) -> requests.Response:
1052        uri = self._format_uri_for_marklogic(judgment_uri)
1053        vars: query_dicts.BreakJudgmentCheckoutDict = {
1054            "uri": uri,
1055        }
1056        return self._send_to_eval(vars, "break_judgment_checkout.xqy")
def user_has_privilege( self, username: str, privilege_uri: caselawclient.xquery_type_dicts.MarkLogicPrivilegeURIString, privilege_action: str) -> requests.models.Response:
1058    def user_has_privilege(
1059        self,
1060        username: str,
1061        privilege_uri: MarkLogicPrivilegeURIString,
1062        privilege_action: str,
1063    ) -> requests.Response:
1064        vars: query_dicts.UserHasPrivilegeDict = {
1065            "user": username,
1066            "privilege_uri": privilege_uri,
1067            "privilege_action": privilege_action,
1068        }
1069        return self._send_to_eval(vars, "user_has_privilege.xqy")
def user_can_view_unpublished_judgments(self, username: str) -> bool:
1071    def user_can_view_unpublished_judgments(self, username: str) -> bool:
1072        if self.user_has_admin_role(username):
1073            return True
1074
1075        check_privilege = self.user_has_privilege(
1076            username,
1077            MarkLogicPrivilegeURIString(
1078                "https://caselaw.nationalarchives.gov.uk/custom/privileges/can-view-unpublished-documents",
1079            ),
1080            "execute",
1081        )
1082        return get_single_string_from_marklogic_response(check_privilege).lower() == "true"
def user_has_role(self, username: str, role: str) -> requests.models.Response:
1084    def user_has_role(self, username: str, role: str) -> requests.Response:
1085        vars: query_dicts.UserHasRoleDict = {
1086            "user": username,
1087            "role": role,
1088        }
1089        return self._send_to_eval(vars, "user_has_role.xqy")
def user_has_admin_role(self, username: str) -> bool:
1091    def user_has_admin_role(self, username: str) -> bool:
1092        check_role = self.user_has_role(
1093            username,
1094            "admin",
1095        )
1096        multipart_data = decoder.MultipartDecoder.from_response(check_role)
1097        result = str(multipart_data.parts[0].text)
1098        return result.lower() == "true"
def calculate_seconds_until_midnight(self, now: Optional[datetime.datetime] = None) -> int:
1100    def calculate_seconds_until_midnight(self, now: Optional[datetime] = None) -> int:
1101        """
1102        Get timedelta until end of day on the datetime passed, or current time.
1103        https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python
1104        """
1105        if not now:
1106            now = datetime.now()
1107        tomorrow = now + timedelta(days=1)
1108        difference = datetime.combine(tomorrow, time.min) - now
1109
1110        return difference.seconds

Get timedelta until end of day on the datetime passed, or current time. https://stackoverflow.com/questions/45986035/seconds-until-end-of-day-in-python

def verify_show_unpublished(self, show_unpublished: bool) -> bool:
1112    def verify_show_unpublished(self, show_unpublished: bool) -> bool:
1113        if show_unpublished and not self.user_can_view_unpublished_judgments(
1114            self.username,
1115        ):
1116            return False
1117        return show_unpublished
def get_properties_for_search_results(self, judgment_uris: list[caselawclient.types.DocumentURIString]) -> str:
1119    def get_properties_for_search_results(
1120        self,
1121        judgment_uris: list[DocumentURIString],
1122    ) -> str:
1123        uris = [self._format_uri_for_marklogic(judgment_uri) for judgment_uri in judgment_uris]
1124        vars: query_dicts.GetPropertiesForSearchResultsDict = {"uris": uris}
1125        response = self._send_to_eval(vars, "get_properties_for_search_results.xqy")
1126        return get_single_string_from_marklogic_response(response)
def search_and_decode_response( self, search_parameters: caselawclient.search_parameters.SearchParameters) -> bytes:
1128    def search_and_decode_response(self, search_parameters: SearchParameters) -> bytes:
1129        response = self.advanced_search(search_parameters)
1130        return get_single_bytestring_from_marklogic_response(response)
def search_judgments_and_decode_response( self, search_parameters: caselawclient.search_parameters.SearchParameters) -> bytes:
1132    def search_judgments_and_decode_response(
1133        self,
1134        search_parameters: SearchParameters,
1135    ) -> bytes:
1136        search_parameters.collections = [DOCUMENT_COLLECTION_URI_JUDGMENT]
1137        return self.search_and_decode_response(search_parameters)
def update_document_uri( self, old_uri: caselawclient.types.DocumentURIString, new_citation: ds_caselaw_utils.types.NeutralCitationString) -> caselawclient.types.DocumentURIString:
1139    def update_document_uri(self, old_uri: DocumentURIString, new_citation: NeutralCitationString) -> DocumentURIString:
1140        """
1141        Move the document at old_uri to the correct location based on the neutral citation
1142        The new neutral citation *must* not already exist (that is handled elsewhere)
1143        This might not be needed; changing the URI/neutral citation is vanishingly rare
1144        """
1145        return move.update_document_uri(old_uri, new_citation, api_client=self)

Move the document at old_uri to the correct location based on the neutral citation The new neutral citation must not already exist (that is handled elsewhere) This might not be needed; changing the URI/neutral citation is vanishingly rare

def get_combined_stats_table(self) -> list[list[typing.Any]]:
1147    def get_combined_stats_table(self) -> list[list[Any]]:
1148        """Run the combined statistics table xquery and return the result as a list of lists, each representing a table
1149        row."""
1150        results: list[list[Any]] = json.loads(
1151            get_single_string_from_marklogic_response(
1152                self._send_to_eval({}, "get_combined_stats_table.xqy"),
1153            ),
1154        )
1155
1156        return results

Run the combined statistics table xquery and return the result as a list of lists, each representing a table row.

def get_highest_enrichment_version(self) -> tuple[int, int]:
1158    def get_highest_enrichment_version(self) -> tuple[int, int]:
1159        """This gets the highest enrichment version in the database,
1160        so if nothing has been enriched with the most recent version of enrichment,
1161        this won't reflect that change."""
1162        table = json.loads(
1163            get_single_string_from_marklogic_response(
1164                self._send_to_eval(
1165                    {},
1166                    "get_highest_enrichment_version.xqy",
1167                ),
1168            ),
1169        )
1170
1171        return (int(table[1][1]), int(table[1][2]))

This gets the highest enrichment version in the database, so if nothing has been enriched with the most recent version of enrichment, this won't reflect that change.

def get_pending_enrichment_for_version( self, target_enrichment_version: tuple[int, int], target_parser_version: tuple[int, int], maximum_records: int = 1000) -> list[list[typing.Any]]:
1173    def get_pending_enrichment_for_version(
1174        self,
1175        target_enrichment_version: tuple[int, int],
1176        target_parser_version: tuple[int, int],
1177        maximum_records: int = 1000,
1178    ) -> list[list[Any]]:
1179        """Retrieve documents which are not yet enriched with a given version."""
1180        vars: query_dicts.GetPendingEnrichmentForVersionDict = {
1181            "target_enrichment_major_version": target_enrichment_version[0],
1182            "target_enrichment_minor_version": target_enrichment_version[1],
1183            "target_parser_major_version": target_parser_version[0],
1184            "target_parser_minor_version": target_parser_version[1],
1185            "maximum_records": maximum_records,
1186        }
1187        results: list[list[Any]] = json.loads(
1188            get_single_string_from_marklogic_response(
1189                self._send_to_eval(
1190                    vars,
1191                    "get_pending_enrichment_for_version.xqy",
1192                ),
1193            ),
1194        )
1195
1196        return results

Retrieve documents which are not yet enriched with a given version.

def get_recently_enriched(self) -> list[list[typing.Any]]:
1198    def get_recently_enriched(
1199        self,
1200    ) -> list[list[Any]]:
1201        """Retrieve documents which are not yet enriched with a given version."""
1202        results: list[list[Any]] = json.loads(
1203            get_single_string_from_marklogic_response(
1204                self._send_to_eval(
1205                    {},
1206                    "get_recently_enriched.xqy",
1207                ),
1208            ),
1209        )
1210
1211        return results

Retrieve documents which are not yet enriched with a given version.

def get_highest_parser_version(self) -> tuple[int, int]:
1213    def get_highest_parser_version(self) -> tuple[int, int]:
1214        """This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change."""
1215        table = json.loads(
1216            get_single_string_from_marklogic_response(
1217                self._send_to_eval(
1218                    {},
1219                    "get_highest_parser_version.xqy",
1220                ),
1221            ),
1222        )
1223
1224        return (int(table[1][1]), int(table[1][2]))

This gets the highest parser version in the database, so if nothing has been parsed with the most recent version of the parser, this won't reflect that change.

def get_documents_pending_parse_for_version( self, target_version: tuple[int, int], maximum_records: int = 1000) -> list[list[typing.Any]]:
1226    def get_documents_pending_parse_for_version(
1227        self,
1228        target_version: tuple[int, int],
1229        maximum_records: int = 1000,
1230    ) -> list[list[Any]]:
1231        """Retrieve a list of documents which are not yet parsed with a given version."""
1232        vars: query_dicts.GetPendingParseForVersionDocumentsDict = {
1233            "target_major_version": target_version[0],
1234            "target_minor_version": target_version[1],
1235            "maximum_records": maximum_records,
1236        }
1237        results: list[list[Any]] = json.loads(
1238            get_single_string_from_marklogic_response(
1239                self._send_to_eval(
1240                    vars,
1241                    "get_pending_parse_for_version_documents.xqy",
1242                ),
1243            ),
1244        )
1245
1246        return results

Retrieve a list of documents which are not yet parsed with a given version.

def get_count_pending_parse_for_version(self, target_version: tuple[int, int]) -> int:
1248    def get_count_pending_parse_for_version(
1249        self,
1250        target_version: tuple[int, int],
1251    ) -> int:
1252        """Get the total number of documents which are not yet parsed with a given version."""
1253        vars: query_dicts.GetPendingParseForVersionCountDict = {
1254            "target_major_version": target_version[0],
1255            "target_minor_version": target_version[1],
1256        }
1257        results = json.loads(
1258            get_single_string_from_marklogic_response(
1259                self._send_to_eval(
1260                    vars,
1261                    "get_pending_parse_for_version_count.xqy",
1262                ),
1263            ),
1264        )
1265
1266        return int(results[1][0])

Get the total number of documents which are not yet parsed with a given version.

def get_recently_parsed(self) -> list[list[typing.Any]]:
1268    def get_recently_parsed(
1269        self,
1270    ) -> list[list[Any]]:
1271        """Retrieve documents which are not yet enriched with a given version."""
1272        results: list[list[Any]] = json.loads(
1273            get_single_string_from_marklogic_response(
1274                self._send_to_eval(
1275                    {},
1276                    "get_recently_parsed.xqy",
1277                ),
1278            ),
1279        )
1280
1281        return results

Retrieve documents which are not yet enriched with a given version.

def get_locked_documents(self) -> list[caselawclient.types.DocumentLock]:
1283    def get_locked_documents(
1284        self,
1285    ) -> list[DocumentLock]:
1286        """Retrieve all currently locked documents."""
1287        results = [
1288            DocumentLock.from_string(lock)
1289            for lock in get_multipart_strings_from_marklogic_response(
1290                self._send_to_eval({}, "get_locked_documents.xqy")
1291            )
1292        ]
1293
1294        return sorted(results, key=lambda lock: lock.timestamp)

Retrieve all currently locked documents.

def get_missing_fclid(self, maximum_records: int = 50) -> list[str]:
1296    def get_missing_fclid(
1297        self,
1298        maximum_records: int = 50,
1299    ) -> list[str]:
1300        """Retrieve the URIs of published documents which do not have an identifier in the `fclid` schema."""
1301        vars: query_dicts.GetMissingFclidDict = {
1302            "maximum_records": maximum_records,
1303        }
1304
1305        results: list[str] = get_multipart_strings_from_marklogic_response(
1306            self._send_to_eval(
1307                vars,
1308                "get_missing_fclid.xqy",
1309            )
1310        )
1311
1312        return results

Retrieve the URIs of published documents which do not have an identifier in the fclid schema.

def resolve_from_identifier_slug( self, identifier_slug: caselawclient.types.DocumentIdentifierSlug, published_only: bool = True) -> caselawclient.identifier_resolution.IdentifierResolutions:
1314    def resolve_from_identifier_slug(
1315        self, identifier_slug: DocumentIdentifierSlug, published_only: bool = True
1316    ) -> IdentifierResolutions:
1317        """Given a PUI/EUI url, look up the precomputed slug and return the
1318        MarkLogic document URIs which match that slug. Multiple returns should be anticipated"""
1319        vars: query_dicts.ResolveFromIdentifierSlugDict = {
1320            "identifier_slug": identifier_slug,
1321            "published_only": int(published_only),
1322        }
1323        raw_results: list[str] = get_multipart_strings_from_marklogic_response(
1324            self._send_to_eval(
1325                vars,
1326                "resolve_from_identifier_slug.xqy",
1327            ),
1328        )
1329        return IdentifierResolutions.from_marklogic_output(raw_results)

Given a PUI/EUI url, look up the precomputed slug and return the MarkLogic document URIs which match that slug. Multiple returns should be anticipated

def resolve_from_identifier_value( self, identifier_value: caselawclient.types.DocumentIdentifierValue, published_only: bool = True) -> caselawclient.identifier_resolution.IdentifierResolutions:
1331    def resolve_from_identifier_value(
1332        self, identifier_value: DocumentIdentifierValue, published_only: bool = True
1333    ) -> IdentifierResolutions:
1334        """Given a PUI/EUI url, look up the precomputed slug and return the
1335        MarkLogic document URIs which match that slug. Multiple returns should be anticipated"""
1336        vars: query_dicts.ResolveFromIdentifierValueDict = {
1337            "identifier_value": identifier_value,
1338            "published_only": int(published_only),
1339        }
1340        raw_results: list[str] = get_multipart_strings_from_marklogic_response(
1341            self._send_to_eval(
1342                vars,
1343                "resolve_from_identifier_value.xqy",
1344            ),
1345        )
1346        return IdentifierResolutions.from_marklogic_output(raw_results)

Given a PUI/EUI url, look up the precomputed slug and return the MarkLogic document URIs which match that slug. Multiple returns should be anticipated

def get_next_document_sequence_number(self) -> int:
1348    def get_next_document_sequence_number(self) -> int:
1349        """Increment the MarkLogic sequence number by one and return the value."""
1350        return int(self._eval_and_decode({}, "get_next_document_sequence_number.xqy"))

Increment the MarkLogic sequence number by one and return the value.