spdx_tools.spdx.parser.rdf.graph_parsing_functions
1# SPDX-FileCopyrightText: 2023 spdx contributors 2# 3# SPDX-License-Identifier: Apache-2.0 4from enum import Enum 5 6from beartype.typing import Any, Callable, Optional, Tuple, Type, Union 7from rdflib import RDF, Graph, URIRef 8from rdflib.exceptions import UniquenessError 9from rdflib.namespace import NamespaceManager 10from rdflib.term import BNode, Literal, Node 11 12from spdx_tools.spdx.casing_tools import camel_case_to_snake_case 13from spdx_tools.spdx.model import SpdxNoAssertion, SpdxNone 14from spdx_tools.spdx.model.spdx_no_assertion import SPDX_NO_ASSERTION_STRING 15from spdx_tools.spdx.model.spdx_none import SPDX_NONE_STRING 16from spdx_tools.spdx.parser.error import SPDXParsingError 17from spdx_tools.spdx.parser.logger import Logger 18from spdx_tools.spdx.rdfschema.namespace import SPDX_NAMESPACE 19 20 21def parse_literal( 22 logger: Logger, 23 graph: Graph, 24 subject: Node, 25 predicate: Node, 26 parsing_method: Callable = lambda x: x.strip(), 27 default: Any = None, 28): 29 value = get_unique_value(logger, graph, subject, predicate, default) 30 if not value: 31 return default 32 return apply_parsing_method_or_log_error(logger, value, parsing_method, default) 33 34 35def apply_parsing_method_or_log_error( 36 logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None 37): 38 try: 39 return parsing_method(value) 40 except SPDXParsingError as err: 41 logger.extend(err.get_messages()) 42 except (TypeError, ValueError) as err: 43 logger.append(err.args[0]) 44 return default 45 46 47def parse_literal_or_no_assertion_or_none( 48 logger: Logger, 49 graph: Graph, 50 subject: Node, 51 predicate: Node, 52 parsing_method: Callable = lambda x: x.strip(), 53 default: Any = None, 54): 55 value = get_unique_value(logger, graph, subject, predicate, default) 56 return get_correctly_typed_value(logger, value, parsing_method, default) 57 58 59def get_correctly_typed_value( 60 logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None 61): 62 if not value: 63 return default 64 if value == SPDX_NAMESPACE.noassertion or value.toPython() == SPDX_NO_ASSERTION_STRING: 65 return SpdxNoAssertion() 66 if value == SPDX_NAMESPACE.none or value.toPython() == SPDX_NONE_STRING: 67 return SpdxNone() 68 return apply_parsing_method_or_log_error(logger, value, parsing_method, default) 69 70 71def get_unique_value(logger: Logger, graph: Graph, subject: Node, predicate: Node, default: Any) -> Any: 72 try: 73 value = graph.value(subject=subject, predicate=predicate, default=default, any=False) 74 return value 75 except UniquenessError: 76 logger.append(f"Multiple values for unique value {predicate} found.") 77 return default 78 79 80def parse_enum_value(enum_str: str, enum_class: Type[Enum], prefix: str) -> Enum: 81 try: 82 enum_without_rdf_prefix = remove_prefix(enum_str, prefix) 83 value = camel_case_to_snake_case(enum_without_rdf_prefix).upper() 84 return enum_class[value] 85 except KeyError: 86 raise SPDXParsingError([f"Invalid value for {enum_class}: {enum_str}"]) 87 88 89def parse_spdx_id(resource: Union[URIRef, BNode], doc_namespace: str, graph: Graph) -> Optional[str]: 90 if not resource or isinstance(resource, BNode): 91 return None 92 if resource.startswith(f"{doc_namespace}#"): 93 return resource.fragment 94 if "#" in resource: 95 namespace_manager = NamespaceManager(graph) 96 return namespace_manager.normalizeUri(resource) 97 return resource.toPython() or None 98 99 100# Python 3.9 introduced the method removeprefix() for strings, but as we are also supporting Python 3.7 and 3.8 we need 101# to write our own helper method to delete prefixes. 102def remove_prefix(string: str, prefix: str) -> str: 103 if string.startswith(prefix): 104 return string[len(prefix) :] 105 return string 106 107 108def get_correctly_typed_triples( 109 logger: Logger, 110 graph: Graph, 111 subject: Optional[Node] = None, 112 predicate: Optional[Node] = None, 113 _object: Optional[Node] = None, 114) -> Tuple[Union[BNode, URIRef], Node, Union[BNode, Literal, URIRef]]: 115 # this is a helper method to cast some rdf types from graph.triples() to be compatible with the 116 # code that follows 117 for s, p, o in graph.triples((subject, predicate, _object)): 118 if not isinstance(s, (BNode, URIRef)): 119 logger.append( 120 f"Warning: Subject {s} should be of type BNode or URIRef, but is {type(s).__name__}. " 121 f"This might lead to a failure." 122 ) 123 if not isinstance(o, (BNode, Literal, URIRef)): 124 logger.append( 125 f"Warning: Object {o} should be of type BNode, Literal or URIRef, but is {type(o).__name__}. " 126 f"This might lead to a failure." 127 ) 128 yield s, p, o 129 130 131def get_value_from_graph( 132 logger: Logger, 133 graph: Graph, 134 subject: Optional[Node] = None, 135 predicate: Optional[Node] = RDF.value, 136 _object: Optional[Node] = None, 137 default: Optional[Any] = None, 138 _any: Optional[bool] = True, 139) -> Optional[Union[URIRef, Literal, BNode]]: 140 # this is a helper method to cast some rdf types from graph.value() to be compatible with the 141 # code that follows 142 value = graph.value(subject=subject, predicate=predicate, object=_object, default=default, any=_any) 143 if value != default and value is not None and not isinstance(value, (URIRef, Literal, BNode)): 144 logger.append( 145 f"Warning: Node {value} should be of type BNode, Literal or URIRef, but is {type(value).__name__}. " 146 f"This might lead to a failure." 147 ) 148 return value
def
parse_literal( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: rdflib.term.Node, predicate: rdflib.term.Node, parsing_method: Callable = <function <lambda>>, default: Any = None):
22def parse_literal( 23 logger: Logger, 24 graph: Graph, 25 subject: Node, 26 predicate: Node, 27 parsing_method: Callable = lambda x: x.strip(), 28 default: Any = None, 29): 30 value = get_unique_value(logger, graph, subject, predicate, default) 31 if not value: 32 return default 33 return apply_parsing_method_or_log_error(logger, value, parsing_method, default)
def
apply_parsing_method_or_log_error( logger: spdx_tools.spdx.parser.logger.Logger, value: Any, parsing_method: Callable = <function <lambda>>, default: Any = None):
36def apply_parsing_method_or_log_error( 37 logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None 38): 39 try: 40 return parsing_method(value) 41 except SPDXParsingError as err: 42 logger.extend(err.get_messages()) 43 except (TypeError, ValueError) as err: 44 logger.append(err.args[0]) 45 return default
def
parse_literal_or_no_assertion_or_none( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: rdflib.term.Node, predicate: rdflib.term.Node, parsing_method: Callable = <function <lambda>>, default: Any = None):
48def parse_literal_or_no_assertion_or_none( 49 logger: Logger, 50 graph: Graph, 51 subject: Node, 52 predicate: Node, 53 parsing_method: Callable = lambda x: x.strip(), 54 default: Any = None, 55): 56 value = get_unique_value(logger, graph, subject, predicate, default) 57 return get_correctly_typed_value(logger, value, parsing_method, default)
def
get_correctly_typed_value( logger: spdx_tools.spdx.parser.logger.Logger, value: Any, parsing_method: Callable = <function <lambda>>, default: Any = None):
60def get_correctly_typed_value( 61 logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None 62): 63 if not value: 64 return default 65 if value == SPDX_NAMESPACE.noassertion or value.toPython() == SPDX_NO_ASSERTION_STRING: 66 return SpdxNoAssertion() 67 if value == SPDX_NAMESPACE.none or value.toPython() == SPDX_NONE_STRING: 68 return SpdxNone() 69 return apply_parsing_method_or_log_error(logger, value, parsing_method, default)
def
get_unique_value( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: rdflib.term.Node, predicate: rdflib.term.Node, default: Any) -> Any:
72def get_unique_value(logger: Logger, graph: Graph, subject: Node, predicate: Node, default: Any) -> Any: 73 try: 74 value = graph.value(subject=subject, predicate=predicate, default=default, any=False) 75 return value 76 except UniquenessError: 77 logger.append(f"Multiple values for unique value {predicate} found.") 78 return default
def
parse_enum_value(enum_str: str, enum_class: type[enum.Enum], prefix: str) -> enum.Enum:
81def parse_enum_value(enum_str: str, enum_class: Type[Enum], prefix: str) -> Enum: 82 try: 83 enum_without_rdf_prefix = remove_prefix(enum_str, prefix) 84 value = camel_case_to_snake_case(enum_without_rdf_prefix).upper() 85 return enum_class[value] 86 except KeyError: 87 raise SPDXParsingError([f"Invalid value for {enum_class}: {enum_str}"])
def
parse_spdx_id( resource: Union[rdflib.term.URIRef, rdflib.term.BNode], doc_namespace: str, graph: rdflib.graph.Graph) -> Optional[str]:
90def parse_spdx_id(resource: Union[URIRef, BNode], doc_namespace: str, graph: Graph) -> Optional[str]: 91 if not resource or isinstance(resource, BNode): 92 return None 93 if resource.startswith(f"{doc_namespace}#"): 94 return resource.fragment 95 if "#" in resource: 96 namespace_manager = NamespaceManager(graph) 97 return namespace_manager.normalizeUri(resource) 98 return resource.toPython() or None
def
remove_prefix(string: str, prefix: str) -> str:
def
get_correctly_typed_triples( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: Optional[rdflib.term.Node] = None, predicate: Optional[rdflib.term.Node] = None, _object: Optional[rdflib.term.Node] = None) -> tuple[typing.Union[rdflib.term.BNode, rdflib.term.URIRef], rdflib.term.Node, typing.Union[rdflib.term.BNode, rdflib.term.Literal, rdflib.term.URIRef]]:
109def get_correctly_typed_triples( 110 logger: Logger, 111 graph: Graph, 112 subject: Optional[Node] = None, 113 predicate: Optional[Node] = None, 114 _object: Optional[Node] = None, 115) -> Tuple[Union[BNode, URIRef], Node, Union[BNode, Literal, URIRef]]: 116 # this is a helper method to cast some rdf types from graph.triples() to be compatible with the 117 # code that follows 118 for s, p, o in graph.triples((subject, predicate, _object)): 119 if not isinstance(s, (BNode, URIRef)): 120 logger.append( 121 f"Warning: Subject {s} should be of type BNode or URIRef, but is {type(s).__name__}. " 122 f"This might lead to a failure." 123 ) 124 if not isinstance(o, (BNode, Literal, URIRef)): 125 logger.append( 126 f"Warning: Object {o} should be of type BNode, Literal or URIRef, but is {type(o).__name__}. " 127 f"This might lead to a failure." 128 ) 129 yield s, p, o
def
get_value_from_graph( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: Optional[rdflib.term.Node] = None, predicate: Optional[rdflib.term.Node] = rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#value'), _object: Optional[rdflib.term.Node] = None, default: Optional[Any] = None, _any: Optional[bool] = True) -> Union[rdflib.term.URIRef, rdflib.term.Literal, rdflib.term.BNode, NoneType]:
132def get_value_from_graph( 133 logger: Logger, 134 graph: Graph, 135 subject: Optional[Node] = None, 136 predicate: Optional[Node] = RDF.value, 137 _object: Optional[Node] = None, 138 default: Optional[Any] = None, 139 _any: Optional[bool] = True, 140) -> Optional[Union[URIRef, Literal, BNode]]: 141 # this is a helper method to cast some rdf types from graph.value() to be compatible with the 142 # code that follows 143 value = graph.value(subject=subject, predicate=predicate, object=_object, default=default, any=_any) 144 if value != default and value is not None and not isinstance(value, (URIRef, Literal, BNode)): 145 logger.append( 146 f"Warning: Node {value} should be of type BNode, Literal or URIRef, but is {type(value).__name__}. " 147 f"This might lead to a failure." 148 ) 149 return value