Source code for ontocheck.task_based_metric

"""
Task-Based Ontology Assessment Metric

Evaluates an ontology against a set of competency questions (encoded as SPARQL
queries) by computing term-overlap metrics. For each question set, two scores
are produced:

    Relevance (Recall)    = \|T_a intersection T_o\| / \|T_a\|
    Accuracy  (Precision) = \|T_a intersection T_o\| / \|T_o\|

where T_a is the set of domain terms referenced in the SPARQL queries (the
"task vocabulary") and T_o is the set of domain terms defined in the ontology.

Questions can be supplied as:
    - A path to a JSON file where each item contains a ``sparql_query`` key.
    - A path to a Markdown file with SPARQL queries inside ``sparql`` blocks.
    - A plain list of SPARQL query strings.
"""

import re
import json
from pathlib import Path

from rdflib import Graph, URIRef
from rdflib.namespace import RDF, RDFS, OWL, XSD


# ---------------------------------------------------------------------------
# Foundational namespace filter
# ---------------------------------------------------------------------------

_FOUNDATIONAL_NS = {
    str(RDF),
    str(RDFS),
    str(OWL),
    str(XSD),
    "http://www.w3.org/2004/02/skos/core#",
    "http://purl.org/dc/terms/",
    "http://qudt.org/schema/qudt/",
    "http://qudt.org/vocab/unit/",
    "http://www.w3.org/XML/1998/namespace",
}


def _is_foundational(uri_str):
    """
    Check whether a URI belongs to a foundational / upper-level namespace.

    Foundational namespaces (RDF, RDFS, OWL, XSD, SKOS, Dublin Core, QUDT,
    XML) are excluded from the domain term set because they represent
    general-purpose vocabulary rather than domain-specific concepts.

    Parameters
    ----------
    uri_str : str
        The full URI string to check.

    Returns
    -------
    bool
        True if the URI starts with any foundational namespace prefix.
    """
    for ns in _FOUNDATIONAL_NS:
        if uri_str.startswith(ns):
            return True
    return False


def _get_local_name(uri_str):
    """
    Extract the local name fragment from a URI.

    Splits on ``#`` first; if the fragment itself contains ``/``, a second
    split is performed.  Falls back to splitting on ``/`` when no ``#`` is
    present.

    Parameters
    ----------
    uri_str : str
        The full URI string.

    Returns
    -------
    str
        The local name portion of the URI.
    """
    uri_str = str(uri_str)
    if "#" in uri_str:
        fragment = uri_str.rsplit("#", 1)[-1]
        if "/" in fragment:
            return fragment.rsplit("/", 1)[-1]
        return fragment
    elif "/" in uri_str:
        return uri_str.rsplit("/", 1)[-1]
    return uri_str


# ---------------------------------------------------------------------------
# Ontology term extraction
# ---------------------------------------------------------------------------

def _get_ontology_terms(ttl_files, domain_ns_fragments=None):
    """
    Parse one or more Turtle files and return the set of domain term local
    names (T_o).

    Terms are discovered through three complementary SPARQL queries:

    1. Entities explicitly typed as ``owl:Class``, ``rdfs:Class``,
       ``owl:ObjectProperty``, ``owl:DatatypeProperty``, or ``rdf:Property``.
    2. Subjects that carry an ``rdfs:label`` (catches properties defined
       without explicit typing).
    3. Subjects of ``rdfs:domain`` or ``rdfs:range`` declarations.

    Foundational-namespace URIs are always excluded.  When
    *domain_ns_fragments* is provided, only URIs whose string representation
    contains at least one of the given fragments are retained.

    Parameters
    ----------
    ttl_files : list of str or list of pathlib.Path
        Paths to Turtle (.ttl) ontology files.
    domain_ns_fragments : list of str or None, optional
        Namespace URI sub-strings used to restrict results to domain-specific
        terms.  If ``None``, all non-foundational terms are included.

    Returns
    -------
    set of str
        Local names of the ontology's domain terms.
    """
    g = Graph()
    for f in ttl_files:
        g.parse(str(f), format="turtle")

    queries = [
        """
        SELECT DISTINCT ?term WHERE {
            { ?term a owl:Class } UNION
            { ?term a rdfs:Class } UNION
            { ?term a owl:ObjectProperty } UNION
            { ?term a owl:DatatypeProperty } UNION
            { ?term a rdf:Property }
        }
        """,
        """
        SELECT DISTINCT ?term WHERE {
            ?term rdfs:label ?label .
            FILTER(isIRI(?term))
        }
        """,
        """
        SELECT DISTINCT ?term WHERE {
            { ?term rdfs:domain ?d } UNION
            { ?term rdfs:range ?r }
            FILTER(isIRI(?term))
        }
        """,
    ]

    ontology_terms = set()
    for q in queries:
        for row in g.query(q):
            if not isinstance(row.term, URIRef):
                continue
            uri = str(row.term)
            if _is_foundational(uri):
                continue
            local = _get_local_name(uri)
            if not local or not local.strip():
                continue
            if domain_ns_fragments:
                if any(frag in uri for frag in domain_ns_fragments):
                    ontology_terms.add(local)
            else:
                ontology_terms.add(local)

    return ontology_terms


# ---------------------------------------------------------------------------
# SPARQL term extraction
# ---------------------------------------------------------------------------

def _extract_terms_from_sparql(sparql_query, domain_prefixes):
    """
    Extract prefixed local names from a SPARQL query string.

    For each prefix in *domain_prefixes*, a regex search finds all occurrences
    of ``prefix:LocalName`` and collects the local name parts.

    Parameters
    ----------
    sparql_query : str
        A single SPARQL query string.
    domain_prefixes : list of str
        Namespace prefixes to scan for (e.g., ``["mds"]``).

    Returns
    -------
    set of str
        Local names referenced in the query under the given prefixes.
    """
    terms = set()
    for prefix in domain_prefixes:
        pattern = rf'{re.escape(prefix)}:([A-Za-z_][A-Za-z0-9_]*)'
        matches = re.findall(pattern, sparql_query)
        terms.update(matches)
    return terms


# ---------------------------------------------------------------------------
# Question loaders
# ---------------------------------------------------------------------------

def _load_json_questions(json_path):
    """
    Load SPARQL queries from a JSON competency-question file.

    Each element of the JSON array is expected to contain a ``sparql_query``
    key whose value is a SPARQL query string.  Elements without this key are
    silently skipped.

    Parameters
    ----------
    json_path : str or pathlib.Path
        Path to the JSON file.

    Returns
    -------
    list of str
        SPARQL query strings extracted from the file.
    """
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    queries = []
    for item in data:
        q = item.get("sparql_query", "")
        if q:
            queries.append(q)
    return queries


def _extract_sparql_from_markdown(md_path):
    """
    Extract SPARQL queries from fenced code blocks in a Markdown file.

    Looks for blocks delimited by ````sparql`` and the closing ``````` and
    returns the content of each block as a separate string.

    Parameters
    ----------
    md_path : str or pathlib.Path
        Path to the Markdown file.

    Returns
    -------
    list of str
        SPARQL query strings found in the file.
    """
    with open(md_path, "r", encoding="utf-8") as f:
        content = f.read()
    return re.findall(r"```sparql\s*(.*?)```", content, re.DOTALL)


# ---------------------------------------------------------------------------
# Main Function
# ---------------------------------------------------------------------------

[docs] def task_based_metric_v_0_0_1(ttl_file, questions, domain_prefixes, domain_ns_fragments=None): """ Compute task-based Relevance and Accuracy for an ontology. Given an ontology (one or more Turtle files) and a set of competency questions expressed as SPARQL queries, this function computes two term-overlap metrics: Relevance (Recall) = \|T_a intersection T_o\| / \|T_a\| Accuracy (Precision) = \|T_a intersection T_o\| / \|T_o\| where *T_a* is the union of domain terms extracted from all SPARQL queries and *T_o* is the set of domain terms defined in the ontology. Parameters ---------- ttl_file : str, pathlib.Path, or list thereof Path(s) to Turtle (.ttl) ontology file(s). A single string or ``Path`` is automatically wrapped in a list. questions : str, pathlib.Path, or list of str The competency questions to evaluate against. Accepted forms: * **str / Path ending in .json** -- path to a JSON file where each array element has a ``sparql_query`` key. * **str / Path ending in .md** -- path to a Markdown file with SPARQL queries inside fenced ``sparql`` code blocks. * **list of str** -- raw SPARQL query strings. domain_prefixes : list of str Namespace prefixes used in the SPARQL queries to identify domain terms (e.g., ``["mds"]``). domain_ns_fragments : list of str or None, optional Sub-strings of namespace URIs used to restrict which ontology terms count as domain-specific. When ``None``, every non-foundational term is included. Returns ------- dict A dictionary with the following keys: - ``relevance`` (float): Recall -- fraction of task terms present in the ontology. - ``accuracy`` (float): Precision -- fraction of ontology terms referenced by the tasks. - ``T_o_count`` (int): Number of ontology domain terms. - ``T_a_count`` (int): Number of unique task terms. - ``intersection`` (int): Number of terms in both sets. - ``missing_from_onto`` (set of str): Task terms absent from the ontology. - ``unused_in_onto`` (set of str): Ontology terms not referenced by any task query. Raises ------ ValueError If *questions* is not a recognized type (list, JSON path, or Markdown path). Examples -------- >>> result = task_based_metric_v_0_0_1( ... ttl_file="my_ontology.ttl", ... questions="competency_questions.json", ... domain_prefixes=["mds"], ... domain_ns_fragments=["cwrusdle.bitbucket.io/mds"], ... ) >>> print(f"Relevance: {result['relevance']:.2%}") >>> print(f"Accuracy: {result['accuracy']:.2%}") """ # Normalise ttl_file to a list if isinstance(ttl_file, (str, Path)): ttl_files = [ttl_file] else: ttl_files = list(ttl_file) # Build ontology term set (T_o) T_o = _get_ontology_terms(ttl_files, domain_ns_fragments) # Build task term set (T_a) from SPARQL queries if isinstance(questions, (str, Path)): qs = str(questions) if qs.endswith(".json"): sparql_queries = _load_json_questions(qs) elif qs.endswith(".md"): sparql_queries = _extract_sparql_from_markdown(qs) else: raise ValueError( f"Unrecognised question file extension: {qs!r}. " "Expected .json or .md, or pass a list of SPARQL strings." ) elif isinstance(questions, list): sparql_queries = questions else: raise ValueError( "The 'questions' argument must be a file path (str/Path to .json " "or .md) or a list of SPARQL query strings." ) T_a = set() for q in sparql_queries: T_a.update(_extract_terms_from_sparql(q, domain_prefixes)) # Compute metrics intersection = T_a & T_o i_count = len(intersection) relevance = (i_count / len(T_a)) if len(T_a) > 0 else 0.0 accuracy = (i_count / len(T_o)) if len(T_o) > 0 else 0.0 return { "relevance": relevance, "accuracy": accuracy, "T_o_count": len(T_o), "T_a_count": len(T_a), "intersection": i_count, "missing_from_onto": T_a - T_o, "unused_in_onto": T_o - T_a, }