caselawclient.models.identifiers.collection
1from typing import TYPE_CHECKING, Optional, Union 2 3from lxml import etree 4 5from caselawclient.types import SuccessFailureMessageTuple 6 7from . import Identifier, IdentifierSchema 8from .fclid import FindCaseLawIdentifier 9from .neutral_citation import NeutralCitationNumber 10from .press_summary_ncn import PressSummaryRelatedNCNIdentifier 11 12if TYPE_CHECKING: 13 from caselawclient.Client import MarklogicApiClient 14 from caselawclient.models.documents import Document 15 16SUPPORTED_IDENTIFIER_TYPES: list[type["Identifier"]] = [ 17 FindCaseLawIdentifier, 18 NeutralCitationNumber, 19 PressSummaryRelatedNCNIdentifier, 20] 21 22 23class IdentifiersCollection(dict[str, Identifier]): 24 def validate_uuids_match_keys(self) -> SuccessFailureMessageTuple: 25 for uuid, identifier in self.items(): 26 if uuid != identifier.uuid: 27 return SuccessFailureMessageTuple( 28 False, [f"Key of {identifier} in Identifiers is {uuid} not {identifier.uuid}"] 29 ) 30 31 return SuccessFailureMessageTuple(True, []) 32 33 def _list_all_identifiers_by_schema(self) -> dict[type[IdentifierSchema], list[Identifier]]: 34 """Get a list of all identifiers, grouped by their schema.""" 35 identifiers_by_schema: dict[type[IdentifierSchema], list[Identifier]] = {} 36 37 for identifier in self.values(): 38 identifiers_by_schema.setdefault(identifier.schema, []).append(identifier) 39 40 return identifiers_by_schema 41 42 def check_only_single_non_deprecated_identifier_where_multiples_not_allowed(self) -> SuccessFailureMessageTuple: 43 """Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples.""" 44 45 for schema, identifiers in self._list_all_identifiers_by_schema().items(): 46 if schema.allow_multiple: 47 continue 48 non_deprecated_identifiers = [i for i in identifiers if not i.deprecated] 49 if len(non_deprecated_identifiers) > 1: 50 return SuccessFailureMessageTuple( 51 False, 52 [ 53 f"Multiple non-deprecated identifiers found for schema '{schema.name}': {', '.join(i.value for i in non_deprecated_identifiers)}" 54 ], 55 ) 56 57 return SuccessFailureMessageTuple(True, []) 58 59 def _perform_collection_level_validations(self) -> SuccessFailureMessageTuple: 60 """Perform identifier validations which are only possible at the collection level, such as UUID integrity and identifying exclusivity problems.""" 61 62 success = True 63 messages: list[str] = [] 64 65 collection_validations_to_run: list[SuccessFailureMessageTuple] = [ 66 self.validate_uuids_match_keys(), 67 self.check_only_single_non_deprecated_identifier_where_multiples_not_allowed(), 68 ] 69 70 for validation in collection_validations_to_run: 71 if not validation.success: 72 success = False 73 messages += validation.messages 74 75 return SuccessFailureMessageTuple(success, messages) 76 77 def _perform_identifier_level_validations( 78 self, document_type: type["Document"], api_client: "MarklogicApiClient" 79 ) -> SuccessFailureMessageTuple: 80 """Perform identifier validations at the individual identifier level.""" 81 82 success = True 83 messages: list[str] = [] 84 85 for _, identifier in self.items(): 86 validations = identifier.perform_all_validations(document_type=document_type, api_client=api_client) 87 if validations.success is False: 88 success = False 89 90 messages += validations.messages 91 92 return SuccessFailureMessageTuple(success, messages) 93 94 def perform_all_validations( 95 self, document_type: type["Document"], api_client: "MarklogicApiClient" 96 ) -> SuccessFailureMessageTuple: 97 """Perform all possible identifier validations on this collection, both at the individual and collection level.""" 98 99 identifier_level_success, identifier_level_messages = self._perform_identifier_level_validations( 100 document_type=document_type, api_client=api_client 101 ) 102 collection_level_success, collection_level_messages = self._perform_collection_level_validations() 103 104 success = all([identifier_level_success, collection_level_success]) 105 all_messages = identifier_level_messages + collection_level_messages 106 107 return SuccessFailureMessageTuple(success, all_messages) 108 109 def contains(self, other_identifier: Identifier) -> bool: 110 """Does the identifier's value and namespace already exist in this group?""" 111 return any(other_identifier.same_as(identifier) for identifier in self.values()) 112 113 def add(self, identifier: Identifier) -> None: 114 if not self.contains(identifier): 115 self[identifier.uuid] = identifier 116 117 def valid_new_identifier_types(self, document_type: type["Document"]) -> list[type[Identifier]]: 118 """Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection.""" 119 return [ 120 t 121 for t in SUPPORTED_IDENTIFIER_TYPES 122 if t.schema.allow_editing 123 and (not t.schema.document_types or document_type.__name__ in t.schema.document_types) 124 ] 125 126 def __delitem__(self, key: Union[Identifier, str]) -> None: 127 if isinstance(key, Identifier): 128 super().__delitem__(key.uuid) 129 else: 130 super().__delitem__(key) 131 132 def of_type(self, identifier_type: type[Identifier]) -> list[Identifier]: 133 """Return a list of all identifiers of a given type.""" 134 uuids = self.keys() 135 return [self[uuid] for uuid in list(uuids) if isinstance(self[uuid], identifier_type)] 136 137 def delete_type(self, deleted_identifier_type: type[Identifier]) -> None: 138 "For when we want an identifier to be the only valid identifier of that type, delete the others first" 139 uuids = self.keys() 140 for uuid in list(uuids): 141 # we could use compare to .schema instead, which would have diffferent behaviour for subclasses 142 if isinstance(self[uuid], deleted_identifier_type): 143 del self[uuid] 144 145 @property 146 def as_etree(self) -> etree._Element: 147 """Return an etree representation of all the Document's identifiers.""" 148 identifiers_root = etree.Element("identifiers") 149 150 for identifier in self.values(): 151 identifiers_root.append(identifier.as_xml_tree) 152 153 return identifiers_root 154 155 def by_score(self, type: Optional[type[Identifier]] = None) -> list[Identifier]: 156 """ 157 :param type: Optionally, an identifier type to constrain this list to. 158 159 :return: Return a list of identifiers, sorted by their score in descending order. 160 """ 161 identifiers = self.of_type(type) if type else list(self.values()) 162 return sorted(identifiers, key=lambda v: v.score, reverse=True) 163 164 def preferred(self, type: Optional[type[Identifier]] = None) -> Optional[Identifier]: 165 """ 166 :param type: Optionally, an identifier type to constrain the results to. 167 168 :return: Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns `None` if no identifier is available. 169 """ 170 if len(self.by_score(type)) == 0: 171 return None 172 return self.by_score(type)[0]
24class IdentifiersCollection(dict[str, Identifier]): 25 def validate_uuids_match_keys(self) -> SuccessFailureMessageTuple: 26 for uuid, identifier in self.items(): 27 if uuid != identifier.uuid: 28 return SuccessFailureMessageTuple( 29 False, [f"Key of {identifier} in Identifiers is {uuid} not {identifier.uuid}"] 30 ) 31 32 return SuccessFailureMessageTuple(True, []) 33 34 def _list_all_identifiers_by_schema(self) -> dict[type[IdentifierSchema], list[Identifier]]: 35 """Get a list of all identifiers, grouped by their schema.""" 36 identifiers_by_schema: dict[type[IdentifierSchema], list[Identifier]] = {} 37 38 for identifier in self.values(): 39 identifiers_by_schema.setdefault(identifier.schema, []).append(identifier) 40 41 return identifiers_by_schema 42 43 def check_only_single_non_deprecated_identifier_where_multiples_not_allowed(self) -> SuccessFailureMessageTuple: 44 """Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples.""" 45 46 for schema, identifiers in self._list_all_identifiers_by_schema().items(): 47 if schema.allow_multiple: 48 continue 49 non_deprecated_identifiers = [i for i in identifiers if not i.deprecated] 50 if len(non_deprecated_identifiers) > 1: 51 return SuccessFailureMessageTuple( 52 False, 53 [ 54 f"Multiple non-deprecated identifiers found for schema '{schema.name}': {', '.join(i.value for i in non_deprecated_identifiers)}" 55 ], 56 ) 57 58 return SuccessFailureMessageTuple(True, []) 59 60 def _perform_collection_level_validations(self) -> SuccessFailureMessageTuple: 61 """Perform identifier validations which are only possible at the collection level, such as UUID integrity and identifying exclusivity problems.""" 62 63 success = True 64 messages: list[str] = [] 65 66 collection_validations_to_run: list[SuccessFailureMessageTuple] = [ 67 self.validate_uuids_match_keys(), 68 self.check_only_single_non_deprecated_identifier_where_multiples_not_allowed(), 69 ] 70 71 for validation in collection_validations_to_run: 72 if not validation.success: 73 success = False 74 messages += validation.messages 75 76 return SuccessFailureMessageTuple(success, messages) 77 78 def _perform_identifier_level_validations( 79 self, document_type: type["Document"], api_client: "MarklogicApiClient" 80 ) -> SuccessFailureMessageTuple: 81 """Perform identifier validations at the individual identifier level.""" 82 83 success = True 84 messages: list[str] = [] 85 86 for _, identifier in self.items(): 87 validations = identifier.perform_all_validations(document_type=document_type, api_client=api_client) 88 if validations.success is False: 89 success = False 90 91 messages += validations.messages 92 93 return SuccessFailureMessageTuple(success, messages) 94 95 def perform_all_validations( 96 self, document_type: type["Document"], api_client: "MarklogicApiClient" 97 ) -> SuccessFailureMessageTuple: 98 """Perform all possible identifier validations on this collection, both at the individual and collection level.""" 99 100 identifier_level_success, identifier_level_messages = self._perform_identifier_level_validations( 101 document_type=document_type, api_client=api_client 102 ) 103 collection_level_success, collection_level_messages = self._perform_collection_level_validations() 104 105 success = all([identifier_level_success, collection_level_success]) 106 all_messages = identifier_level_messages + collection_level_messages 107 108 return SuccessFailureMessageTuple(success, all_messages) 109 110 def contains(self, other_identifier: Identifier) -> bool: 111 """Does the identifier's value and namespace already exist in this group?""" 112 return any(other_identifier.same_as(identifier) for identifier in self.values()) 113 114 def add(self, identifier: Identifier) -> None: 115 if not self.contains(identifier): 116 self[identifier.uuid] = identifier 117 118 def valid_new_identifier_types(self, document_type: type["Document"]) -> list[type[Identifier]]: 119 """Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection.""" 120 return [ 121 t 122 for t in SUPPORTED_IDENTIFIER_TYPES 123 if t.schema.allow_editing 124 and (not t.schema.document_types or document_type.__name__ in t.schema.document_types) 125 ] 126 127 def __delitem__(self, key: Union[Identifier, str]) -> None: 128 if isinstance(key, Identifier): 129 super().__delitem__(key.uuid) 130 else: 131 super().__delitem__(key) 132 133 def of_type(self, identifier_type: type[Identifier]) -> list[Identifier]: 134 """Return a list of all identifiers of a given type.""" 135 uuids = self.keys() 136 return [self[uuid] for uuid in list(uuids) if isinstance(self[uuid], identifier_type)] 137 138 def delete_type(self, deleted_identifier_type: type[Identifier]) -> None: 139 "For when we want an identifier to be the only valid identifier of that type, delete the others first" 140 uuids = self.keys() 141 for uuid in list(uuids): 142 # we could use compare to .schema instead, which would have diffferent behaviour for subclasses 143 if isinstance(self[uuid], deleted_identifier_type): 144 del self[uuid] 145 146 @property 147 def as_etree(self) -> etree._Element: 148 """Return an etree representation of all the Document's identifiers.""" 149 identifiers_root = etree.Element("identifiers") 150 151 for identifier in self.values(): 152 identifiers_root.append(identifier.as_xml_tree) 153 154 return identifiers_root 155 156 def by_score(self, type: Optional[type[Identifier]] = None) -> list[Identifier]: 157 """ 158 :param type: Optionally, an identifier type to constrain this list to. 159 160 :return: Return a list of identifiers, sorted by their score in descending order. 161 """ 162 identifiers = self.of_type(type) if type else list(self.values()) 163 return sorted(identifiers, key=lambda v: v.score, reverse=True) 164 165 def preferred(self, type: Optional[type[Identifier]] = None) -> Optional[Identifier]: 166 """ 167 :param type: Optionally, an identifier type to constrain the results to. 168 169 :return: Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns `None` if no identifier is available. 170 """ 171 if len(self.by_score(type)) == 0: 172 return None 173 return self.by_score(type)[0]
25 def validate_uuids_match_keys(self) -> SuccessFailureMessageTuple: 26 for uuid, identifier in self.items(): 27 if uuid != identifier.uuid: 28 return SuccessFailureMessageTuple( 29 False, [f"Key of {identifier} in Identifiers is {uuid} not {identifier.uuid}"] 30 ) 31 32 return SuccessFailureMessageTuple(True, [])
43 def check_only_single_non_deprecated_identifier_where_multiples_not_allowed(self) -> SuccessFailureMessageTuple: 44 """Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples.""" 45 46 for schema, identifiers in self._list_all_identifiers_by_schema().items(): 47 if schema.allow_multiple: 48 continue 49 non_deprecated_identifiers = [i for i in identifiers if not i.deprecated] 50 if len(non_deprecated_identifiers) > 1: 51 return SuccessFailureMessageTuple( 52 False, 53 [ 54 f"Multiple non-deprecated identifiers found for schema '{schema.name}': {', '.join(i.value for i in non_deprecated_identifiers)}" 55 ], 56 ) 57 58 return SuccessFailureMessageTuple(True, [])
Check that only one non-deprecated identifier exists per schema where that schema does not allow multiples.
95 def perform_all_validations( 96 self, document_type: type["Document"], api_client: "MarklogicApiClient" 97 ) -> SuccessFailureMessageTuple: 98 """Perform all possible identifier validations on this collection, both at the individual and collection level.""" 99 100 identifier_level_success, identifier_level_messages = self._perform_identifier_level_validations( 101 document_type=document_type, api_client=api_client 102 ) 103 collection_level_success, collection_level_messages = self._perform_collection_level_validations() 104 105 success = all([identifier_level_success, collection_level_success]) 106 all_messages = identifier_level_messages + collection_level_messages 107 108 return SuccessFailureMessageTuple(success, all_messages)
Perform all possible identifier validations on this collection, both at the individual and collection level.
110 def contains(self, other_identifier: Identifier) -> bool: 111 """Does the identifier's value and namespace already exist in this group?""" 112 return any(other_identifier.same_as(identifier) for identifier in self.values())
Does the identifier's value and namespace already exist in this group?
118 def valid_new_identifier_types(self, document_type: type["Document"]) -> list[type[Identifier]]: 119 """Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection.""" 120 return [ 121 t 122 for t in SUPPORTED_IDENTIFIER_TYPES 123 if t.schema.allow_editing 124 and (not t.schema.document_types or document_type.__name__ in t.schema.document_types) 125 ]
Return a list of identifier types which can be added to a document of the given type, given identifiers already in this collection.
133 def of_type(self, identifier_type: type[Identifier]) -> list[Identifier]: 134 """Return a list of all identifiers of a given type.""" 135 uuids = self.keys() 136 return [self[uuid] for uuid in list(uuids) if isinstance(self[uuid], identifier_type)]
Return a list of all identifiers of a given type.
138 def delete_type(self, deleted_identifier_type: type[Identifier]) -> None: 139 "For when we want an identifier to be the only valid identifier of that type, delete the others first" 140 uuids = self.keys() 141 for uuid in list(uuids): 142 # we could use compare to .schema instead, which would have diffferent behaviour for subclasses 143 if isinstance(self[uuid], deleted_identifier_type): 144 del self[uuid]
For when we want an identifier to be the only valid identifier of that type, delete the others first
146 @property 147 def as_etree(self) -> etree._Element: 148 """Return an etree representation of all the Document's identifiers.""" 149 identifiers_root = etree.Element("identifiers") 150 151 for identifier in self.values(): 152 identifiers_root.append(identifier.as_xml_tree) 153 154 return identifiers_root
Return an etree representation of all the Document's identifiers.
156 def by_score(self, type: Optional[type[Identifier]] = None) -> list[Identifier]: 157 """ 158 :param type: Optionally, an identifier type to constrain this list to. 159 160 :return: Return a list of identifiers, sorted by their score in descending order. 161 """ 162 identifiers = self.of_type(type) if type else list(self.values()) 163 return sorted(identifiers, key=lambda v: v.score, reverse=True)
Parameters
- type: Optionally, an identifier type to constrain this list to.
Returns
Return a list of identifiers, sorted by their score in descending order.
165 def preferred(self, type: Optional[type[Identifier]] = None) -> Optional[Identifier]: 166 """ 167 :param type: Optionally, an identifier type to constrain the results to. 168 169 :return: Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns `None` if no identifier is available. 170 """ 171 if len(self.by_score(type)) == 0: 172 return None 173 return self.by_score(type)[0]
Parameters
- type: Optionally, an identifier type to constrain the results to.
Returns
Return the highest scoring identifier of the given type (or of any type, if none is specified). Returns
Noneif no identifier is available.