spdx_tools.spdx.parser.rdf.graph_parsing_functions

  1# SPDX-FileCopyrightText: 2023 spdx contributors
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4from enum import Enum
  5
  6from beartype.typing import Any, Callable, Optional, Tuple, Type, Union
  7from rdflib import RDF, Graph, URIRef
  8from rdflib.exceptions import UniquenessError
  9from rdflib.namespace import NamespaceManager
 10from rdflib.term import BNode, Literal, Node
 11
 12from spdx_tools.spdx.casing_tools import camel_case_to_snake_case
 13from spdx_tools.spdx.model import SpdxNoAssertion, SpdxNone
 14from spdx_tools.spdx.model.spdx_no_assertion import SPDX_NO_ASSERTION_STRING
 15from spdx_tools.spdx.model.spdx_none import SPDX_NONE_STRING
 16from spdx_tools.spdx.parser.error import SPDXParsingError
 17from spdx_tools.spdx.parser.logger import Logger
 18from spdx_tools.spdx.rdfschema.namespace import SPDX_NAMESPACE
 19
 20
 21def parse_literal(
 22    logger: Logger,
 23    graph: Graph,
 24    subject: Node,
 25    predicate: Node,
 26    parsing_method: Callable = lambda x: x.strip(),
 27    default: Any = None,
 28):
 29    value = get_unique_value(logger, graph, subject, predicate, default)
 30    if not value:
 31        return default
 32    return apply_parsing_method_or_log_error(logger, value, parsing_method, default)
 33
 34
 35def apply_parsing_method_or_log_error(
 36    logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None
 37):
 38    try:
 39        return parsing_method(value)
 40    except SPDXParsingError as err:
 41        logger.extend(err.get_messages())
 42    except (TypeError, ValueError) as err:
 43        logger.append(err.args[0])
 44    return default
 45
 46
 47def parse_literal_or_no_assertion_or_none(
 48    logger: Logger,
 49    graph: Graph,
 50    subject: Node,
 51    predicate: Node,
 52    parsing_method: Callable = lambda x: x.strip(),
 53    default: Any = None,
 54):
 55    value = get_unique_value(logger, graph, subject, predicate, default)
 56    return get_correctly_typed_value(logger, value, parsing_method, default)
 57
 58
 59def get_correctly_typed_value(
 60    logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None
 61):
 62    if not value:
 63        return default
 64    if value == SPDX_NAMESPACE.noassertion or value.toPython() == SPDX_NO_ASSERTION_STRING:
 65        return SpdxNoAssertion()
 66    if value == SPDX_NAMESPACE.none or value.toPython() == SPDX_NONE_STRING:
 67        return SpdxNone()
 68    return apply_parsing_method_or_log_error(logger, value, parsing_method, default)
 69
 70
 71def get_unique_value(logger: Logger, graph: Graph, subject: Node, predicate: Node, default: Any) -> Any:
 72    try:
 73        value = graph.value(subject=subject, predicate=predicate, default=default, any=False)
 74        return value
 75    except UniquenessError:
 76        logger.append(f"Multiple values for unique value {predicate} found.")
 77        return default
 78
 79
 80def parse_enum_value(enum_str: str, enum_class: Type[Enum], prefix: str) -> Enum:
 81    try:
 82        enum_without_rdf_prefix = remove_prefix(enum_str, prefix)
 83        value = camel_case_to_snake_case(enum_without_rdf_prefix).upper()
 84        return enum_class[value]
 85    except KeyError:
 86        raise SPDXParsingError([f"Invalid value for {enum_class}: {enum_str}"])
 87
 88
 89def parse_spdx_id(resource: Union[URIRef, BNode], doc_namespace: str, graph: Graph) -> Optional[str]:
 90    if not resource or isinstance(resource, BNode):
 91        return None
 92    if resource.startswith(f"{doc_namespace}#"):
 93        return resource.fragment
 94    if "#" in resource:
 95        namespace_manager = NamespaceManager(graph)
 96        return namespace_manager.normalizeUri(resource)
 97    return resource.toPython() or None
 98
 99
100# Python 3.9 introduced the method removeprefix() for strings, but as we are also supporting Python 3.7 and 3.8 we need
101# to write our own helper method to delete prefixes.
102def remove_prefix(string: str, prefix: str) -> str:
103    if string.startswith(prefix):
104        return string[len(prefix) :]
105    return string
106
107
108def get_correctly_typed_triples(
109    logger: Logger,
110    graph: Graph,
111    subject: Optional[Node] = None,
112    predicate: Optional[Node] = None,
113    _object: Optional[Node] = None,
114) -> Tuple[Union[BNode, URIRef], Node, Union[BNode, Literal, URIRef]]:
115    # this is a helper method to cast some rdf types from graph.triples() to be compatible with the
116    # code that follows
117    for s, p, o in graph.triples((subject, predicate, _object)):
118        if not isinstance(s, (BNode, URIRef)):
119            logger.append(
120                f"Warning: Subject {s} should be of type BNode or URIRef, but is {type(s).__name__}. "
121                f"This might lead to a failure."
122            )
123        if not isinstance(o, (BNode, Literal, URIRef)):
124            logger.append(
125                f"Warning: Object {o} should be of type BNode, Literal or URIRef, but is {type(o).__name__}. "
126                f"This might lead to a failure."
127            )
128        yield s, p, o
129
130
131def get_value_from_graph(
132    logger: Logger,
133    graph: Graph,
134    subject: Optional[Node] = None,
135    predicate: Optional[Node] = RDF.value,
136    _object: Optional[Node] = None,
137    default: Optional[Any] = None,
138    _any: Optional[bool] = True,
139) -> Optional[Union[URIRef, Literal, BNode]]:
140    # this is a helper method to cast some rdf types from graph.value() to be compatible with the
141    # code that follows
142    value = graph.value(subject=subject, predicate=predicate, object=_object, default=default, any=_any)
143    if value != default and value is not None and not isinstance(value, (URIRef, Literal, BNode)):
144        logger.append(
145            f"Warning: Node {value} should be of type BNode, Literal or URIRef, but is {type(value).__name__}. "
146            f"This might lead to a failure."
147        )
148    return value
def parse_literal( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: rdflib.term.Node, predicate: rdflib.term.Node, parsing_method: collections.abc.Callable = <function <lambda>>, default: Any = None):
22def parse_literal(
23    logger: Logger,
24    graph: Graph,
25    subject: Node,
26    predicate: Node,
27    parsing_method: Callable = lambda x: x.strip(),
28    default: Any = None,
29):
30    value = get_unique_value(logger, graph, subject, predicate, default)
31    if not value:
32        return default
33    return apply_parsing_method_or_log_error(logger, value, parsing_method, default)
def apply_parsing_method_or_log_error( logger: spdx_tools.spdx.parser.logger.Logger, value: Any, parsing_method: collections.abc.Callable = <function <lambda>>, default: Any = None):
36def apply_parsing_method_or_log_error(
37    logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None
38):
39    try:
40        return parsing_method(value)
41    except SPDXParsingError as err:
42        logger.extend(err.get_messages())
43    except (TypeError, ValueError) as err:
44        logger.append(err.args[0])
45    return default
def parse_literal_or_no_assertion_or_none( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: rdflib.term.Node, predicate: rdflib.term.Node, parsing_method: collections.abc.Callable = <function <lambda>>, default: Any = None):
48def parse_literal_or_no_assertion_or_none(
49    logger: Logger,
50    graph: Graph,
51    subject: Node,
52    predicate: Node,
53    parsing_method: Callable = lambda x: x.strip(),
54    default: Any = None,
55):
56    value = get_unique_value(logger, graph, subject, predicate, default)
57    return get_correctly_typed_value(logger, value, parsing_method, default)
def get_correctly_typed_value( logger: spdx_tools.spdx.parser.logger.Logger, value: Any, parsing_method: collections.abc.Callable = <function <lambda>>, default: Any = None):
60def get_correctly_typed_value(
61    logger: Logger, value: Any, parsing_method: Callable = lambda x: x.strip(), default: Any = None
62):
63    if not value:
64        return default
65    if value == SPDX_NAMESPACE.noassertion or value.toPython() == SPDX_NO_ASSERTION_STRING:
66        return SpdxNoAssertion()
67    if value == SPDX_NAMESPACE.none or value.toPython() == SPDX_NONE_STRING:
68        return SpdxNone()
69    return apply_parsing_method_or_log_error(logger, value, parsing_method, default)
def get_unique_value( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: rdflib.term.Node, predicate: rdflib.term.Node, default: Any) -> Any:
72def get_unique_value(logger: Logger, graph: Graph, subject: Node, predicate: Node, default: Any) -> Any:
73    try:
74        value = graph.value(subject=subject, predicate=predicate, default=default, any=False)
75        return value
76    except UniquenessError:
77        logger.append(f"Multiple values for unique value {predicate} found.")
78        return default
def parse_enum_value(enum_str: str, enum_class: type[enum.Enum], prefix: str) -> enum.Enum:
81def parse_enum_value(enum_str: str, enum_class: Type[Enum], prefix: str) -> Enum:
82    try:
83        enum_without_rdf_prefix = remove_prefix(enum_str, prefix)
84        value = camel_case_to_snake_case(enum_without_rdf_prefix).upper()
85        return enum_class[value]
86    except KeyError:
87        raise SPDXParsingError([f"Invalid value for {enum_class}: {enum_str}"])
def parse_spdx_id( resource: Union[rdflib.term.URIRef, rdflib.term.BNode], doc_namespace: str, graph: rdflib.graph.Graph) -> Optional[str]:
90def parse_spdx_id(resource: Union[URIRef, BNode], doc_namespace: str, graph: Graph) -> Optional[str]:
91    if not resource or isinstance(resource, BNode):
92        return None
93    if resource.startswith(f"{doc_namespace}#"):
94        return resource.fragment
95    if "#" in resource:
96        namespace_manager = NamespaceManager(graph)
97        return namespace_manager.normalizeUri(resource)
98    return resource.toPython() or None
def remove_prefix(string: str, prefix: str) -> str:
103def remove_prefix(string: str, prefix: str) -> str:
104    if string.startswith(prefix):
105        return string[len(prefix) :]
106    return string
def get_correctly_typed_triples( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: Optional[rdflib.term.Node] = None, predicate: Optional[rdflib.term.Node] = None, _object: Optional[rdflib.term.Node] = None) -> tuple[typing.Union[rdflib.term.BNode, rdflib.term.URIRef], rdflib.term.Node, typing.Union[rdflib.term.BNode, rdflib.term.Literal, rdflib.term.URIRef]]:
109def get_correctly_typed_triples(
110    logger: Logger,
111    graph: Graph,
112    subject: Optional[Node] = None,
113    predicate: Optional[Node] = None,
114    _object: Optional[Node] = None,
115) -> Tuple[Union[BNode, URIRef], Node, Union[BNode, Literal, URIRef]]:
116    # this is a helper method to cast some rdf types from graph.triples() to be compatible with the
117    # code that follows
118    for s, p, o in graph.triples((subject, predicate, _object)):
119        if not isinstance(s, (BNode, URIRef)):
120            logger.append(
121                f"Warning: Subject {s} should be of type BNode or URIRef, but is {type(s).__name__}. "
122                f"This might lead to a failure."
123            )
124        if not isinstance(o, (BNode, Literal, URIRef)):
125            logger.append(
126                f"Warning: Object {o} should be of type BNode, Literal or URIRef, but is {type(o).__name__}. "
127                f"This might lead to a failure."
128            )
129        yield s, p, o
def get_value_from_graph( logger: spdx_tools.spdx.parser.logger.Logger, graph: rdflib.graph.Graph, subject: Optional[rdflib.term.Node] = None, predicate: Optional[rdflib.term.Node] = rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#value'), _object: Optional[rdflib.term.Node] = None, default: Optional[Any] = None, _any: Optional[bool] = True) -> Union[rdflib.term.URIRef, rdflib.term.Literal, rdflib.term.BNode, NoneType]:
132def get_value_from_graph(
133    logger: Logger,
134    graph: Graph,
135    subject: Optional[Node] = None,
136    predicate: Optional[Node] = RDF.value,
137    _object: Optional[Node] = None,
138    default: Optional[Any] = None,
139    _any: Optional[bool] = True,
140) -> Optional[Union[URIRef, Literal, BNode]]:
141    # this is a helper method to cast some rdf types from graph.value() to be compatible with the
142    # code that follows
143    value = graph.value(subject=subject, predicate=predicate, object=_object, default=default, any=_any)
144    if value != default and value is not None and not isinstance(value, (URIRef, Literal, BNode)):
145        logger.append(
146            f"Warning: Node {value} should be of type BNode, Literal or URIRef, but is {type(value).__name__}. "
147            f"This might lead to a failure."
148        )
149    return value