caselawclient.models.identifiers.collection

  1from typing import TYPE_CHECKING, Optional, Union
  2
  3from lxml import etree
  4
  5from caselawclient.types import SuccessFailureMessageTuple
  6
  7from . import Identifier, IdentifierSchema
  8from .fclid import FindCaseLawIdentifier
  9from .neutral_citation import NeutralCitationNumber
 10from .press_summary_ncn import PressSummaryRelatedNCNIdentifier
 11
 12if TYPE_CHECKING:
 13    from caselawclient.Client import MarklogicApiClient
 14    from caselawclient.models.documents import Document
 15
 16SUPPORTED_IDENTIFIER_TYPES: list[type["Identifier"]] = [
 17    FindCaseLawIdentifier,
 18    NeutralCitationNumber,
 19    PressSummaryRelatedNCNIdentifier,
 20]
 21
 22
 23class IdentifiersCollection(dict[str, Identifier]):
 24    def validate_uuids_match_keys(self) -> SuccessFailureMessageTuple:
 25        for uuid, identifier in self.items():
 26            if uuid != identifier.uuid:
 27                return SuccessFailureMessageTuple(
 28                    False, [f"Key of {identifier} in Identifiers is {uuid} not {identifier.uuid}"]
 29                )
 30
 31        return SuccessFailureMessageTuple(True, [])
 32
 33    def _list_all_identifiers_by_schema(self) -> dict[type[IdentifierSchema], list[Identifier]]:
 34        """Get a list of all identifiers, grouped by their schema."""
 35        identifiers_by_schema: dict[type[IdentifierSchema], list[Identifier]] = {}
 36
 37        for identifier in self.values():
 38            identifiers_by_schema.setdefault(identifier.schema, []).append(identifier)
 39
 40        return identifiers_by_schema
 41
 42    def check_only_single_non_deprecated_identifier_where_multiples_not_allowed(self) -> SuccessFailureMessageTuple:
 43        """Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples."""
 44
 45        for schema, identifiers in self._list_all_identifiers_by_schema().items():
 46            if schema.allow_multiple:
 47                continue
 48            non_deprecated_identifiers = [i for i in identifiers if not i.deprecated]
 49            if len(non_deprecated_identifiers) > 1:
 50                return SuccessFailureMessageTuple(
 51                    False,
 52                    [
 53                        f"Multiple non-deprecated identifiers found for schema '{schema.name}': {', '.join(i.value for i in non_deprecated_identifiers)}"
 54                    ],
 55                )
 56
 57        return SuccessFailureMessageTuple(True, [])
 58
 59    def _perform_collection_level_validations(self) -> SuccessFailureMessageTuple:
 60        """Perform identifier validations which are only possible at the collection level, such as UUID integrity and identifying exclusivity problems."""
 61
 62        success = True
 63        messages: list[str] = []
 64
 65        collection_validations_to_run: list[SuccessFailureMessageTuple] = [
 66            self.validate_uuids_match_keys(),
 67            self.check_only_single_non_deprecated_identifier_where_multiples_not_allowed(),
 68        ]
 69
 70        for validation in collection_validations_to_run:
 71            if not validation.success:
 72                success = False
 73                messages += validation.messages
 74
 75        return SuccessFailureMessageTuple(success, messages)
 76
 77    def _perform_identifier_level_validations(
 78        self, document_type: type["Document"], api_client: "MarklogicApiClient"
 79    ) -> SuccessFailureMessageTuple:
 80        """Perform identifier validations at the individual identifier level."""
 81
 82        success = True
 83        messages: list[str] = []
 84
 85        for _, identifier in self.items():
 86            validations = identifier.perform_all_validations(document_type=document_type, api_client=api_client)
 87            if validations.success is False:
 88                success = False
 89
 90            messages += validations.messages
 91
 92        return SuccessFailureMessageTuple(success, messages)
 93
 94    def perform_all_validations(
 95        self, document_type: type["Document"], api_client: "MarklogicApiClient"
 96    ) -> SuccessFailureMessageTuple:
 97        """Perform all possible identifier validations on this collection, both at the individual and collection level."""
 98
 99        identifier_level_success, identifier_level_messages = self._perform_identifier_level_validations(
100            document_type=document_type, api_client=api_client
101        )
102        collection_level_success, collection_level_messages = self._perform_collection_level_validations()
103
104        success = all([identifier_level_success, collection_level_success])
105        all_messages = identifier_level_messages + collection_level_messages
106
107        return SuccessFailureMessageTuple(success, all_messages)
108
109    def contains(self, other_identifier: Identifier) -> bool:
110        """Does the identifier's value and namespace already exist in this group?"""
111        return any(other_identifier.same_as(identifier) for identifier in self.values())
112
113    def add(self, identifier: Identifier) -> None:
114        if not self.contains(identifier):
115            self[identifier.uuid] = identifier
116
117    def valid_new_identifier_types(self, document_type: type["Document"]) -> list[type[Identifier]]:
118        """Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection."""
119        return [
120            t
121            for t in SUPPORTED_IDENTIFIER_TYPES
122            if t.schema.allow_editing
123            and (not t.schema.document_types or document_type.__name__ in t.schema.document_types)
124        ]
125
126    def __delitem__(self, key: Union[Identifier, str]) -> None:
127        if isinstance(key, Identifier):
128            super().__delitem__(key.uuid)
129        else:
130            super().__delitem__(key)
131
132    def of_type(self, identifier_type: type[Identifier]) -> list[Identifier]:
133        """Return a list of all identifiers of a given type."""
134        uuids = self.keys()
135        return [self[uuid] for uuid in list(uuids) if isinstance(self[uuid], identifier_type)]
136
137    def delete_type(self, deleted_identifier_type: type[Identifier]) -> None:
138        "For when we want an identifier to be the only valid identifier of that type, delete the others first"
139        uuids = self.keys()
140        for uuid in list(uuids):
141            # we could use compare to .schema instead, which would have diffferent behaviour for subclasses
142            if isinstance(self[uuid], deleted_identifier_type):
143                del self[uuid]
144
145    @property
146    def as_etree(self) -> etree._Element:
147        """Return an etree representation of all the Document's identifiers."""
148        identifiers_root = etree.Element("identifiers")
149
150        for identifier in self.values():
151            identifiers_root.append(identifier.as_xml_tree)
152
153        return identifiers_root
154
155    def by_score(self, type: Optional[type[Identifier]] = None) -> list[Identifier]:
156        """
157        :param type: Optionally, an identifier type to constrain this list to.
158
159        :return: Return a list of identifiers, sorted by their score in descending order.
160        """
161        identifiers = self.of_type(type) if type else list(self.values())
162        return sorted(identifiers, key=lambda v: v.score, reverse=True)
163
164    def preferred(self, type: Optional[type[Identifier]] = None) -> Optional[Identifier]:
165        """
166        :param type: Optionally, an identifier type to constrain the results to.
167
168        :return: Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns `None` if no identifier is available.
169        """
170        if len(self.by_score(type)) == 0:
171            return None
172        return self.by_score(type)[0]
class IdentifiersCollection(dict[str, caselawclient.models.identifiers.Identifier]):
 24class IdentifiersCollection(dict[str, Identifier]):
 25    def validate_uuids_match_keys(self) -> SuccessFailureMessageTuple:
 26        for uuid, identifier in self.items():
 27            if uuid != identifier.uuid:
 28                return SuccessFailureMessageTuple(
 29                    False, [f"Key of {identifier} in Identifiers is {uuid} not {identifier.uuid}"]
 30                )
 31
 32        return SuccessFailureMessageTuple(True, [])
 33
 34    def _list_all_identifiers_by_schema(self) -> dict[type[IdentifierSchema], list[Identifier]]:
 35        """Get a list of all identifiers, grouped by their schema."""
 36        identifiers_by_schema: dict[type[IdentifierSchema], list[Identifier]] = {}
 37
 38        for identifier in self.values():
 39            identifiers_by_schema.setdefault(identifier.schema, []).append(identifier)
 40
 41        return identifiers_by_schema
 42
 43    def check_only_single_non_deprecated_identifier_where_multiples_not_allowed(self) -> SuccessFailureMessageTuple:
 44        """Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples."""
 45
 46        for schema, identifiers in self._list_all_identifiers_by_schema().items():
 47            if schema.allow_multiple:
 48                continue
 49            non_deprecated_identifiers = [i for i in identifiers if not i.deprecated]
 50            if len(non_deprecated_identifiers) > 1:
 51                return SuccessFailureMessageTuple(
 52                    False,
 53                    [
 54                        f"Multiple non-deprecated identifiers found for schema '{schema.name}': {', '.join(i.value for i in non_deprecated_identifiers)}"
 55                    ],
 56                )
 57
 58        return SuccessFailureMessageTuple(True, [])
 59
 60    def _perform_collection_level_validations(self) -> SuccessFailureMessageTuple:
 61        """Perform identifier validations which are only possible at the collection level, such as UUID integrity and identifying exclusivity problems."""
 62
 63        success = True
 64        messages: list[str] = []
 65
 66        collection_validations_to_run: list[SuccessFailureMessageTuple] = [
 67            self.validate_uuids_match_keys(),
 68            self.check_only_single_non_deprecated_identifier_where_multiples_not_allowed(),
 69        ]
 70
 71        for validation in collection_validations_to_run:
 72            if not validation.success:
 73                success = False
 74                messages += validation.messages
 75
 76        return SuccessFailureMessageTuple(success, messages)
 77
 78    def _perform_identifier_level_validations(
 79        self, document_type: type["Document"], api_client: "MarklogicApiClient"
 80    ) -> SuccessFailureMessageTuple:
 81        """Perform identifier validations at the individual identifier level."""
 82
 83        success = True
 84        messages: list[str] = []
 85
 86        for _, identifier in self.items():
 87            validations = identifier.perform_all_validations(document_type=document_type, api_client=api_client)
 88            if validations.success is False:
 89                success = False
 90
 91            messages += validations.messages
 92
 93        return SuccessFailureMessageTuple(success, messages)
 94
 95    def perform_all_validations(
 96        self, document_type: type["Document"], api_client: "MarklogicApiClient"
 97    ) -> SuccessFailureMessageTuple:
 98        """Perform all possible identifier validations on this collection, both at the individual and collection level."""
 99
100        identifier_level_success, identifier_level_messages = self._perform_identifier_level_validations(
101            document_type=document_type, api_client=api_client
102        )
103        collection_level_success, collection_level_messages = self._perform_collection_level_validations()
104
105        success = all([identifier_level_success, collection_level_success])
106        all_messages = identifier_level_messages + collection_level_messages
107
108        return SuccessFailureMessageTuple(success, all_messages)
109
110    def contains(self, other_identifier: Identifier) -> bool:
111        """Does the identifier's value and namespace already exist in this group?"""
112        return any(other_identifier.same_as(identifier) for identifier in self.values())
113
114    def add(self, identifier: Identifier) -> None:
115        if not self.contains(identifier):
116            self[identifier.uuid] = identifier
117
118    def valid_new_identifier_types(self, document_type: type["Document"]) -> list[type[Identifier]]:
119        """Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection."""
120        return [
121            t
122            for t in SUPPORTED_IDENTIFIER_TYPES
123            if t.schema.allow_editing
124            and (not t.schema.document_types or document_type.__name__ in t.schema.document_types)
125        ]
126
127    def __delitem__(self, key: Union[Identifier, str]) -> None:
128        if isinstance(key, Identifier):
129            super().__delitem__(key.uuid)
130        else:
131            super().__delitem__(key)
132
133    def of_type(self, identifier_type: type[Identifier]) -> list[Identifier]:
134        """Return a list of all identifiers of a given type."""
135        uuids = self.keys()
136        return [self[uuid] for uuid in list(uuids) if isinstance(self[uuid], identifier_type)]
137
138    def delete_type(self, deleted_identifier_type: type[Identifier]) -> None:
139        "For when we want an identifier to be the only valid identifier of that type, delete the others first"
140        uuids = self.keys()
141        for uuid in list(uuids):
142            # we could use compare to .schema instead, which would have diffferent behaviour for subclasses
143            if isinstance(self[uuid], deleted_identifier_type):
144                del self[uuid]
145
146    @property
147    def as_etree(self) -> etree._Element:
148        """Return an etree representation of all the Document's identifiers."""
149        identifiers_root = etree.Element("identifiers")
150
151        for identifier in self.values():
152            identifiers_root.append(identifier.as_xml_tree)
153
154        return identifiers_root
155
156    def by_score(self, type: Optional[type[Identifier]] = None) -> list[Identifier]:
157        """
158        :param type: Optionally, an identifier type to constrain this list to.
159
160        :return: Return a list of identifiers, sorted by their score in descending order.
161        """
162        identifiers = self.of_type(type) if type else list(self.values())
163        return sorted(identifiers, key=lambda v: v.score, reverse=True)
164
165    def preferred(self, type: Optional[type[Identifier]] = None) -> Optional[Identifier]:
166        """
167        :param type: Optionally, an identifier type to constrain the results to.
168
169        :return: Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns `None` if no identifier is available.
170        """
171        if len(self.by_score(type)) == 0:
172            return None
173        return self.by_score(type)[0]
def validate_uuids_match_keys(self) -> caselawclient.types.SuccessFailureMessageTuple:
25    def validate_uuids_match_keys(self) -> SuccessFailureMessageTuple:
26        for uuid, identifier in self.items():
27            if uuid != identifier.uuid:
28                return SuccessFailureMessageTuple(
29                    False, [f"Key of {identifier} in Identifiers is {uuid} not {identifier.uuid}"]
30                )
31
32        return SuccessFailureMessageTuple(True, [])
def check_only_single_non_deprecated_identifier_where_multiples_not_allowed(self) -> caselawclient.types.SuccessFailureMessageTuple:
43    def check_only_single_non_deprecated_identifier_where_multiples_not_allowed(self) -> SuccessFailureMessageTuple:
44        """Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples."""
45
46        for schema, identifiers in self._list_all_identifiers_by_schema().items():
47            if schema.allow_multiple:
48                continue
49            non_deprecated_identifiers = [i for i in identifiers if not i.deprecated]
50            if len(non_deprecated_identifiers) > 1:
51                return SuccessFailureMessageTuple(
52                    False,
53                    [
54                        f"Multiple non-deprecated identifiers found for schema '{schema.name}': {', '.join(i.value for i in non_deprecated_identifiers)}"
55                    ],
56                )
57
58        return SuccessFailureMessageTuple(True, [])

Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples.

def perform_all_validations( self, document_type: type[caselawclient.models.documents.Document], api_client: caselawclient.Client.MarklogicApiClient) -> caselawclient.types.SuccessFailureMessageTuple:
 95    def perform_all_validations(
 96        self, document_type: type["Document"], api_client: "MarklogicApiClient"
 97    ) -> SuccessFailureMessageTuple:
 98        """Perform all possible identifier validations on this collection, both at the individual and collection level."""
 99
100        identifier_level_success, identifier_level_messages = self._perform_identifier_level_validations(
101            document_type=document_type, api_client=api_client
102        )
103        collection_level_success, collection_level_messages = self._perform_collection_level_validations()
104
105        success = all([identifier_level_success, collection_level_success])
106        all_messages = identifier_level_messages + collection_level_messages
107
108        return SuccessFailureMessageTuple(success, all_messages)

Perform all possible identifier validations on this collection, both at the individual and collection level.

def contains( self, other_identifier: caselawclient.models.identifiers.Identifier) -> bool:
110    def contains(self, other_identifier: Identifier) -> bool:
111        """Does the identifier's value and namespace already exist in this group?"""
112        return any(other_identifier.same_as(identifier) for identifier in self.values())

Does the identifier's value and namespace already exist in this group?

def add(self, identifier: caselawclient.models.identifiers.Identifier) -> None:
114    def add(self, identifier: Identifier) -> None:
115        if not self.contains(identifier):
116            self[identifier.uuid] = identifier
def valid_new_identifier_types( self, document_type: type[caselawclient.models.documents.Document]) -> list[type[caselawclient.models.identifiers.Identifier]]:
118    def valid_new_identifier_types(self, document_type: type["Document"]) -> list[type[Identifier]]:
119        """Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection."""
120        return [
121            t
122            for t in SUPPORTED_IDENTIFIER_TYPES
123            if t.schema.allow_editing
124            and (not t.schema.document_types or document_type.__name__ in t.schema.document_types)
125        ]

Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection.

def of_type( self, identifier_type: type[caselawclient.models.identifiers.Identifier]) -> list[caselawclient.models.identifiers.Identifier]:
133    def of_type(self, identifier_type: type[Identifier]) -> list[Identifier]:
134        """Return a list of all identifiers of a given type."""
135        uuids = self.keys()
136        return [self[uuid] for uuid in list(uuids) if isinstance(self[uuid], identifier_type)]

Return a list of all identifiers of a given type.

def delete_type( self, deleted_identifier_type: type[caselawclient.models.identifiers.Identifier]) -> None:
138    def delete_type(self, deleted_identifier_type: type[Identifier]) -> None:
139        "For when we want an identifier to be the only valid identifier of that type, delete the others first"
140        uuids = self.keys()
141        for uuid in list(uuids):
142            # we could use compare to .schema instead, which would have diffferent behaviour for subclasses
143            if isinstance(self[uuid], deleted_identifier_type):
144                del self[uuid]

For when we want an identifier to be the only valid identifier of that type, delete the others first

as_etree: lxml.etree._Element
146    @property
147    def as_etree(self) -> etree._Element:
148        """Return an etree representation of all the Document's identifiers."""
149        identifiers_root = etree.Element("identifiers")
150
151        for identifier in self.values():
152            identifiers_root.append(identifier.as_xml_tree)
153
154        return identifiers_root

Return an etree representation of all the Document's identifiers.

def by_score( self, type: Optional[type[caselawclient.models.identifiers.Identifier]] = None) -> list[caselawclient.models.identifiers.Identifier]:
156    def by_score(self, type: Optional[type[Identifier]] = None) -> list[Identifier]:
157        """
158        :param type: Optionally, an identifier type to constrain this list to.
159
160        :return: Return a list of identifiers, sorted by their score in descending order.
161        """
162        identifiers = self.of_type(type) if type else list(self.values())
163        return sorted(identifiers, key=lambda v: v.score, reverse=True)
Parameters
  • type: Optionally, an identifier type to constrain this list to.
Returns

Return a list of identifiers, sorted by their score in descending order.

def preferred( self, type: Optional[type[caselawclient.models.identifiers.Identifier]] = None) -> Optional[caselawclient.models.identifiers.Identifier]:
165    def preferred(self, type: Optional[type[Identifier]] = None) -> Optional[Identifier]:
166        """
167        :param type: Optionally, an identifier type to constrain the results to.
168
169        :return: Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns `None` if no identifier is available.
170        """
171        if len(self.by_score(type)) == 0:
172            return None
173        return self.by_score(type)[0]
Parameters
  • type: Optionally, an identifier type to constrain the results to.
Returns

Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns None if no identifier is available.