"""Provenance tracking for financial calculations.
This module provides the core data structures and utilities for tracking
the provenance (lineage) of financial calculations. Every FinancialValue
can maintain a complete record of how it was computed.
"""
from __future__ import annotations
import hashlib
import weakref
from collections import defaultdict
from collections.abc import Generator
from contextlib import contextmanager
from contextvars import ContextVar, Token
from dataclasses import dataclass
from decimal import Decimal
from typing import TYPE_CHECKING, Any
try:
from weakref import WeakSet
except ImportError:
# Fallback for older Python versions
WeakSet = set
if TYPE_CHECKING:
from .policy import Policy
from .value import FinancialValue
# Import configuration system
try:
from .provenance_config import (
get_config,
log_provenance_error,
should_fail_on_error,
should_track_literals,
should_track_operations,
should_track_provenance,
)
except ImportError:
# Fallback implementations if config module is not available
def get_config():
return None
def log_provenance_error(error, context="", **metadata):
pass
def should_track_provenance():
return True
def should_track_literals():
return True
def should_track_operations():
return True
def should_fail_on_error():
return False
# ======================== Performance Optimizations ========================
# Provenance ID interning for memory efficiency
_interned_ids: dict[str, str] = {}
_id_intern_lock = None # Will be initialized if threading is available
# Hash cache for identical operations
_hash_cache: dict[str, str] = {}
_cache_hits = 0
_cache_misses = 0
# Weak reference tracking for memory management
_active_provenance_refs: WeakSet = None # Will be initialized if weakref is available
# History truncation tracking
_provenance_history: dict[str, int] = defaultdict(int)
_max_history_entries = 1000
def _init_performance_optimizations():
"""Initialize performance optimization components."""
global _id_intern_lock, _active_provenance_refs
try:
# Initialize threading lock for ID interning
import threading
_id_intern_lock = threading.RLock()
except ImportError:
_id_intern_lock = None
try:
# Initialize weak reference set
_active_provenance_refs = WeakSet()
except Exception:
_active_provenance_refs = None
def intern_provenance_id(prov_id: str) -> str:
"""Intern a provenance ID to reduce memory usage from duplicate strings.
Args:
prov_id: The provenance ID to intern
Returns:
Interned provenance ID (same object for identical strings)
"""
try:
config = get_config()
if not config or not getattr(config, "enable_id_interning", True):
return prov_id
# Use thread-safe interning if available
if _id_intern_lock is not None:
with _id_intern_lock:
if prov_id in _interned_ids:
return _interned_ids[prov_id]
_interned_ids[prov_id] = prov_id
return prov_id
else:
# Fallback to non-thread-safe interning
if prov_id in _interned_ids:
return _interned_ids[prov_id]
_interned_ids[prov_id] = prov_id
return prov_id
except Exception as e:
log_provenance_error(e, "intern_provenance_id")
return prov_id
def _get_cached_hash(cache_key: str) -> str | None:
"""Get a cached hash result if available.
Args:
cache_key: Key for the hash cache
Returns:
Cached hash if available, None otherwise
"""
global _cache_hits, _cache_misses
try:
if cache_key in _hash_cache:
_cache_hits += 1
return _hash_cache[cache_key]
else:
_cache_misses += 1
return None
except Exception as e:
log_provenance_error(e, "_get_cached_hash")
return None
def _cache_hash(cache_key: str, hash_value: str) -> str:
"""Cache a hash result for future use.
Args:
cache_key: Key for the hash cache
hash_value: Hash value to cache
Returns:
The hash value (for convenience)
"""
try:
config = get_config()
max_cache_size = (
getattr(config, "max_hash_cache_size", 10000) if config else 10000
)
# Limit cache size to prevent unbounded growth
if len(_hash_cache) >= max_cache_size:
# Remove oldest entries (simple FIFO eviction)
keys_to_remove = list(_hash_cache.keys())[: max_cache_size // 4]
for key in keys_to_remove:
_hash_cache.pop(key, None)
_hash_cache[cache_key] = hash_value
return hash_value
except Exception as e:
log_provenance_error(e, "_cache_hash")
return hash_value
def _register_provenance_ref(prov: Provenance) -> None:
"""Register a provenance instance for weak reference tracking.
Args:
prov: Provenance instance to track
"""
try:
config = get_config()
if not config or not getattr(config, "enable_weak_refs", False):
return
if _active_provenance_refs is not None:
_active_provenance_refs.add(prov)
except Exception as e:
log_provenance_error(e, "_register_provenance_ref")
def _should_truncate_history(prov_id: str) -> bool:
"""Check if provenance history should be truncated for this ID.
Args:
prov_id: Provenance ID to check
Returns:
True if history should be truncated, False otherwise
"""
try:
config = get_config()
if not config:
return False
max_depth = getattr(config, "max_history_depth", 1000)
if max_depth <= 0:
return False
_provenance_history[prov_id] += 1
return _provenance_history[prov_id] > max_depth
except Exception as e:
log_provenance_error(e, "_should_truncate_history")
return False
def get_cache_stats() -> dict[str, Any]:
"""Get performance statistics for provenance caching.
Returns:
Dictionary with cache performance statistics
"""
try:
total_requests = _cache_hits + _cache_misses
hit_rate = (_cache_hits / total_requests * 100) if total_requests > 0 else 0
stats = {
"cache_hits": _cache_hits,
"cache_misses": _cache_misses,
"hit_rate_percent": round(hit_rate, 2),
"cache_size": len(_hash_cache),
"interned_ids": len(_interned_ids),
"history_entries": len(_provenance_history),
}
if _active_provenance_refs is not None:
stats["active_provenance_refs"] = len(_active_provenance_refs)
return stats
except Exception as e:
log_provenance_error(e, "get_cache_stats")
return {"error": "stats_unavailable"}
def clear_caches() -> None:
"""Clear all provenance caches and reset statistics."""
global _cache_hits, _cache_misses
try:
_hash_cache.clear()
_interned_ids.clear()
_provenance_history.clear()
_cache_hits = 0
_cache_misses = 0
if _active_provenance_refs is not None:
_active_provenance_refs.clear()
except Exception as e:
log_provenance_error(e, "clear_caches")
# Initialize performance optimizations
_init_performance_optimizations()
# Use frozendict if available, otherwise fall back to dict
try:
from frozendict import frozendict
except ImportError:
# Fallback implementation for immutable dict
class frozendict(dict):
def __setitem__(self, key, value):
raise TypeError("frozendict is immutable")
def __delitem__(self, key):
raise TypeError("frozendict is immutable")
def clear(self):
raise TypeError("frozendict is immutable")
def pop(self, *args):
raise TypeError("frozendict is immutable")
def popitem(self):
raise TypeError("frozendict is immutable")
def setdefault(self, key, default=None):
raise TypeError("frozendict is immutable")
def update(self, *args, **kwargs):
raise TypeError("frozendict is immutable")
# Context variables for span tracking
_current_span_stack: ContextVar[list[dict[str, Any]]] = ContextVar(
"_current_span_stack", default=[]
)
[docs]
@dataclass(frozen=True)
class Provenance:
"""Immutable provenance record for financial value calculations."""
__slots__ = ('id', 'op', 'inputs', 'meta')
id: str # Stable hash of operation + operands + policy
op: str # Operation identifier ("+", "/", "calc:gross_margin", "literal")
inputs: tuple[str, ...] # Child provenance IDs
meta: frozendict[str, Any] # Optional metadata (names, tags, constants)
def __post_init__(self):
# Ensure meta is immutable
if not isinstance(self.meta, frozendict):
object.__setattr__(self, "meta", frozendict(self.meta))
# Intern the provenance ID for memory efficiency
interned_id = intern_provenance_id(self.id)
if interned_id is not self.id:
object.__setattr__(self, "id", interned_id)
# Intern input IDs as well
if self.inputs:
interned_inputs = tuple(
intern_provenance_id(input_id) for input_id in self.inputs
)
if interned_inputs != self.inputs:
object.__setattr__(self, "inputs", interned_inputs)
# Register for weak reference tracking
_register_provenance_ref(self)
[docs]
def hash_literal(value: Decimal | None, policy: Policy) -> str:
"""Generate stable hash for literal values.
Args:
value: The literal value (Decimal or None)
policy: The policy context for the value
Returns:
SHA-256 hash string for the literal
Raises:
Exception: Only if fail_on_error is True in configuration
"""
try:
# Check if literal tracking is enabled
if not should_track_literals():
return _generate_fallback_id("literal")
# Create a stable representation of the value and policy
if value is not None:
# Normalize decimal representation to avoid differences between 100 and 100.00
value_str = str(value.normalize())
else:
value_str = "None"
policy_fingerprint = _get_policy_fingerprint(policy)
# Create cache key for this literal
cache_key = f"literal:{value_str}:{policy_fingerprint}"
# Check cache first
cached_hash = _get_cached_hash(cache_key)
if cached_hash is not None:
return cached_hash
# Generate hash and cache it
hash_value = hashlib.sha256(cache_key.encode("utf-8")).hexdigest()
return _cache_hash(cache_key, hash_value)
except Exception as e:
log_provenance_error(
e,
"hash_literal",
value=str(value) if value is not None else "None",
policy_type=type(policy).__name__ if policy else "None",
)
if should_fail_on_error():
raise
# Graceful degradation: return a fallback hash
return _generate_fallback_id(
"literal", str(value) if value is not None else "None"
)
[docs]
def hash_node(
op: str,
parents: tuple[FinancialValue, ...],
policy: Policy,
meta: dict | None = None,
) -> str:
"""Generate stable hash for operation nodes.
Args:
op: Operation identifier (e.g., "+", "-", "calc:margin")
parents: Parent FinancialValue instances
policy: Policy context for the operation
meta: Optional metadata dictionary
Returns:
SHA-256 hash string for the operation node
Raises:
Exception: Only if fail_on_error is True in configuration
"""
try:
# Check if operation tracking is enabled
if not should_track_operations():
return _generate_fallback_id("operation", op)
# Get parent provenance IDs with error handling
parent_ids = []
for i, parent in enumerate(parents):
try:
if hasattr(parent, "_prov") and parent._prov is not None:
parent_id = parent._prov.id
# Check if we should truncate history for this parent
if _should_truncate_history(parent_id):
parent_id = f"truncated:{parent_id[:16]}"
parent_ids.append(parent_id)
else:
# Generate a literal provenance ID for parents without provenance
parent_id = hash_literal(
getattr(parent, "_value", None), getattr(parent, "policy", None)
)
parent_ids.append(parent_id)
except Exception as parent_error:
log_provenance_error(
parent_error, f"hash_node_parent_{i}", operation=op, parent_index=i
)
# Use fallback ID for problematic parent
parent_ids.append(_generate_fallback_id("parent", f"{op}_{i}"))
# Merge span information into metadata with error handling
combined_meta = {}
if meta:
try:
combined_meta.update(meta)
except Exception as meta_error:
log_provenance_error(meta_error, "hash_node_meta_merge", operation=op)
# Continue without metadata
# Add current span information with error handling
try:
span_info = _get_current_span_info()
if span_info:
combined_meta.update(span_info)
except Exception as span_error:
log_provenance_error(span_error, "hash_node_span_info", operation=op)
# Continue without span info
# Create stable representation with error handling
try:
policy_fingerprint = _get_policy_fingerprint(policy)
except Exception as policy_error:
log_provenance_error(
policy_error, "hash_node_policy_fingerprint", operation=op
)
policy_fingerprint = "error"
try:
meta_str = _serialize_meta(combined_meta) if combined_meta else ""
except Exception as serialize_error:
log_provenance_error(
serialize_error, "hash_node_meta_serialize", operation=op
)
meta_str = "error"
# Create cache key for this operation
cache_key = f"op:{op}:parents:{':'.join(sorted(parent_ids))}:policy:{policy_fingerprint}:meta:{meta_str}"
# Check cache first
cached_hash = _get_cached_hash(cache_key)
if cached_hash is not None:
return cached_hash
# Generate hash and cache it
hash_value = hashlib.sha256(cache_key.encode("utf-8")).hexdigest()
return _cache_hash(cache_key, hash_value)
except Exception as e:
log_provenance_error(
e,
"hash_node",
operation=op,
parent_count=len(parents) if parents else 0,
has_meta=meta is not None,
)
if should_fail_on_error():
raise
# Graceful degradation: return a fallback hash
return _generate_fallback_id("operation", op)
def _generate_fallback_id(category: str, identifier: str = "") -> str:
"""Generate a fallback provenance ID when normal generation fails.
Args:
category: Category of the fallback (e.g., "literal", "operation")
identifier: Additional identifier to make the fallback unique
Returns:
Fallback provenance ID
"""
try:
import random
import time
# Create a simple but unique fallback ID
timestamp = str(int(time.time() * 1000)) # milliseconds
random_part = str(random.randint(1000, 9999))
if identifier:
content = f"fallback:{category}:{identifier}:{timestamp}:{random_part}"
else:
content = f"fallback:{category}:{timestamp}:{random_part}"
# Use a simple hash for fallback IDs
return hashlib.md5(content.encode("utf-8")).hexdigest()
except Exception:
# Ultimate fallback - even simpler ID generation
try:
import uuid
return f"fallback_{category}_{str(uuid.uuid4()).replace('-', '')[:8]}"
except Exception:
# Last resort - static fallback with some randomness
import os
pid = os.getpid() if hasattr(os, "getpid") else 0
return f"fallback_{category}_{pid}_{hash(identifier) % 10000}"
def _get_policy_fingerprint(policy: Policy) -> str:
"""Generate a stable fingerprint for a policy.
Args:
policy: The policy to fingerprint
Returns:
Stable string representation of the policy
Raises:
Exception: Only if fail_on_error is True in configuration
"""
try:
if policy is None:
return "None"
# Create a stable representation of key policy attributes
# This is a simplified version - in production we'd want to include
# all relevant policy fields that affect calculations
attrs = []
# Safely access policy attributes
try:
attrs.append(
f"decimal_places:{getattr(policy, 'decimal_places', 'unknown')}"
)
except Exception:
attrs.append("decimal_places:error")
try:
attrs.append(f"rounding:{getattr(policy, 'rounding', 'unknown')}")
except Exception:
attrs.append("rounding:error")
try:
attrs.append(f"none_text:{getattr(policy, 'none_text', 'unknown')}")
except Exception:
attrs.append("none_text:error")
return "|".join(sorted(attrs))
except Exception as e:
log_provenance_error(e, "_get_policy_fingerprint")
if should_fail_on_error():
raise
return "policy_error"
def _serialize_meta(meta: dict) -> str:
"""Serialize metadata dictionary to stable string.
Args:
meta: Metadata dictionary
Returns:
Stable string representation of metadata
Raises:
Exception: Only if fail_on_error is True in configuration
"""
try:
if not meta:
return ""
# Sort keys for stable serialization
items = []
for key in sorted(meta.keys()):
try:
value = meta[key]
# Safely convert value to string with proper handling of nested structures
if isinstance(value, dict):
# For dictionaries, serialize the key-value pairs
dict_items = []
for k, v in sorted(value.items()):
dict_items.append(f"{k}={v}")
value_str = f"dict({','.join(dict_items)})"
elif isinstance(value, list):
# For lists, serialize the elements
list_items = [str(item) for item in value]
value_str = f"list({','.join(list_items)})"
else:
value_str = str(value)
items.append(f"{key}:{value_str}")
except Exception as item_error:
log_provenance_error(item_error, "_serialize_meta_item", key=key)
# Include error marker for problematic items
items.append(f"{key}:error")
return "|".join(items)
except Exception as e:
log_provenance_error(e, "_serialize_meta")
if should_fail_on_error():
raise
return "meta_error"
# ======================== Calculation Span Context Management ========================
def _push_calc_context(name: str, attrs: dict[str, Any]) -> Token:
"""Push a new calculation context onto the span stack.
Args:
name: Name of the calculation span
attrs: Additional attributes for the span
Returns:
Token that can be used to restore the previous context
Raises:
Exception: Only if fail_on_error is True in configuration
"""
try:
current_stack = _current_span_stack.get([])
# Safely copy attributes
safe_attrs = {}
if attrs:
for key, value in attrs.items():
try:
# Ensure the value is serializable
str(value)
safe_attrs[key] = value
except Exception as attr_error:
log_provenance_error(attr_error, "_push_calc_context_attr", key=key)
safe_attrs[key] = f"error:{type(value).__name__}"
# Create new span context
span_context = {
"name": str(name), # Ensure name is a string
"attrs": safe_attrs,
"depth": len(current_stack),
}
# Create new stack with the span added
new_stack = current_stack + [span_context]
# Set the new stack and return the token
return _current_span_stack.set(new_stack)
except Exception as e:
log_provenance_error(e, "_push_calc_context", span_name=name)
if should_fail_on_error():
raise
# Return a dummy token that won't cause issues when reset
return _current_span_stack.set(_current_span_stack.get([]))
def _pop_calc_context(token: Token) -> None:
"""Pop the calculation context using the provided token.
Args:
token: Token returned by _push_calc_context
Raises:
Exception: Only if fail_on_error is True in configuration
"""
try:
_current_span_stack.reset(token)
except Exception as e:
log_provenance_error(e, "_pop_calc_context")
if should_fail_on_error():
raise
# Graceful degradation: try to clear the stack
try:
_current_span_stack.set([])
except Exception:
pass # If even this fails, just continue
def _get_current_span_info() -> dict[str, Any]:
"""Get current span information for inclusion in provenance metadata.
Returns:
Dictionary containing current span information, empty if no active spans
Raises:
Exception: Only if fail_on_error is True in configuration
"""
try:
current_stack = _current_span_stack.get([])
if not current_stack:
return {}
# Build span hierarchy information
span_info = {}
# Add current span name with error handling
try:
current_span = current_stack[-1]
span_info["span"] = current_span.get("name", "unknown")
except (IndexError, AttributeError, TypeError) as span_error:
log_provenance_error(span_error, "_get_current_span_info_current")
return {}
# Add span attributes with error handling
try:
if current_span.get("attrs"):
span_info["span_attrs"] = current_span["attrs"].copy()
except Exception as attrs_error:
log_provenance_error(attrs_error, "_get_current_span_info_attrs")
# Continue without attributes
# Add span hierarchy if nested with error handling
try:
if len(current_stack) > 1:
hierarchy = []
for span in current_stack:
try:
hierarchy.append(span.get("name", "unknown"))
except Exception:
hierarchy.append("error")
span_info["span_hierarchy"] = hierarchy
span_info["span_depth"] = len(current_stack)
except Exception as hierarchy_error:
log_provenance_error(hierarchy_error, "_get_current_span_info_hierarchy")
# Continue without hierarchy info
return span_info
except Exception as e:
log_provenance_error(e, "_get_current_span_info")
if should_fail_on_error():
raise
return {}
[docs]
@contextmanager
def calc_span(name: str, **attrs) -> Generator[None, None, None]:
"""Context manager for grouping calculations under a named span.
This context manager allows grouping related financial calculations
under a named span, which will be included in the provenance metadata
of all operations performed within the span context.
Args:
name: Name of the calculation span
**attrs: Additional attributes to associate with the span
Yields:
None
Example:
>>> with calc_span("quarterly_analysis", quarter="Q1", year=2024):
... revenue = FinancialValue(1000)
... cost = FinancialValue(600)
... profit = revenue - cost # Will include span info in provenance
>>> prov = profit.get_provenance()
>>> print(prov.meta.get("span")) # "quarterly_analysis"
>>> print(prov.meta.get("span_attrs")) # {"quarter": "Q1", "year": 2024}
"""
token = None
try:
# Check if spans are enabled
config = get_config()
if config and not getattr(config, "enable_spans", True):
# Spans disabled, just yield without tracking
yield
return
token = _push_calc_context(name, attrs)
except Exception as setup_error:
# Log span setup errors but don't break user code
log_provenance_error(setup_error, "calc_span_setup", span_name=name)
if should_fail_on_error():
raise
try:
# Always yield, even if span setup failed
yield
finally:
# Always try to clean up the span context
if token is not None:
try:
_pop_calc_context(token)
except Exception as cleanup_error:
log_provenance_error(cleanup_error, "calc_span_cleanup", span_name=name)
# ======================== Export and Analysis Functions ========================
[docs]
def get_provenance_graph(fv: FinancialValue) -> dict[str, Provenance]:
"""Extract complete provenance graph as dictionary.
This function traverses the complete provenance graph starting from the given
FinancialValue and returns a dictionary mapping provenance IDs to their
Provenance records. This is useful for analysis and debugging of calculation
lineage.
Note: This implementation can only traverse the provenance records that are
directly accessible from the root FinancialValue. In the current architecture,
we don't maintain a global provenance store, so we can only include the root
provenance record. A full implementation would require either:
1. A global provenance registry, or
2. Maintaining references to parent FinancialValue instances
Args:
fv: FinancialValue to extract provenance graph from
Returns:
Dictionary mapping provenance IDs to Provenance records
Example:
>>> revenue = FinancialValue(1000)
>>> cost = FinancialValue(600)
>>> profit = revenue - cost
>>> graph = get_provenance_graph(profit)
>>> print(len(graph)) # 1 (only profit, as we can't traverse to inputs)
>>> print(list(graph.keys())) # ['profit_id']
"""
try:
if not hasattr(fv, "has_provenance") or not fv.has_provenance():
return {}
graph = {}
visited: set[str] = set()
# Check graph size limits
config = get_config()
max_size = getattr(config, "max_graph_size", 10000) if config else 10000
use_weak_refs = getattr(config, "enable_weak_refs", False) if config else False
# Use weak references to prevent memory leaks during traversal
weak_refs: set[weakref.ReferenceType] = set() if use_weak_refs else None
def _traverse(prov: Provenance) -> None:
"""Recursively traverse provenance graph."""
try:
if prov.id in visited:
return
# Check size limits
if len(graph) >= max_size:
log_provenance_error(
Exception(f"Graph size limit exceeded: {max_size}"),
"get_provenance_graph_size_limit",
)
return
visited.add(prov.id)
graph[prov.id] = prov
# Track weak reference if enabled
if weak_refs is not None:
try:
weak_ref = weakref.ref(prov)
weak_refs.add(weak_ref)
except TypeError:
# Some objects can't be weakly referenced
pass
# Note: We cannot traverse to input provenance records because
# we don't have access to the original FinancialValue instances
# that contain those provenance records. This is a limitation
# of the current architecture.
except Exception as traverse_error:
log_provenance_error(
traverse_error, "get_provenance_graph_traverse", prov_id=prov.id
)
root_prov = fv.get_provenance()
if root_prov:
_traverse(root_prov)
# Clean up weak references
if weak_refs is not None:
# Remove dead references
weak_refs = {ref for ref in weak_refs if ref() is not None}
return graph
except Exception as e:
log_provenance_error(e, "get_provenance_graph")
if should_fail_on_error():
raise
return {}
[docs]
def to_trace_json(fv: FinancialValue) -> dict[str, Any]:
"""Export complete provenance graph as JSON-serializable dictionary.
This function creates a complete JSON representation of the provenance graph
that can be serialized, stored, or transmitted. The format includes a root
node identifier and a nodes dictionary containing all provenance records.
Args:
fv: FinancialValue to export provenance graph from
Returns:
Dictionary with 'root' and 'nodes' keys containing the complete graph
Example:
>>> revenue = FinancialValue(1000)
>>> cost = FinancialValue(600)
>>> profit = revenue - cost
>>> trace = to_trace_json(profit)
>>> print(trace['root']) # profit provenance ID
>>> print(len(trace['nodes'])) # 3 nodes
"""
try:
if not hasattr(fv, "has_provenance") or not fv.has_provenance():
return {"root": None, "nodes": {}}
root_prov = fv.get_provenance()
if not root_prov:
return {"root": None, "nodes": {}}
# Get the complete provenance graph with error handling
try:
graph = get_provenance_graph(fv)
except Exception as graph_error:
log_provenance_error(graph_error, "to_trace_json_get_graph")
if should_fail_on_error():
raise
graph = {}
# Convert to JSON-serializable format
nodes = {}
for prov_id, prov in graph.items():
try:
# Safely convert provenance to JSON
node_data = {
"id": str(prov.id),
"op": str(prov.op),
"inputs": [],
"meta": {},
}
# Safely convert inputs
try:
node_data["inputs"] = [str(input_id) for input_id in prov.inputs]
except Exception as inputs_error:
log_provenance_error(
inputs_error, "to_trace_json_inputs", prov_id=prov_id
)
node_data["inputs"] = ["error"]
# Safely convert metadata
try:
if prov.meta:
# Ensure all metadata values are JSON-serializable
safe_meta = {}
for key, value in prov.meta.items():
try:
# Test JSON serializability
import json
json.dumps(value)
safe_meta[str(key)] = value
except (TypeError, ValueError):
# Convert non-serializable values to strings
safe_meta[str(key)] = str(value)
node_data["meta"] = safe_meta
except Exception as meta_error:
log_provenance_error(
meta_error, "to_trace_json_meta", prov_id=prov_id
)
node_data["meta"] = {"error": "metadata_conversion_failed"}
nodes[prov_id] = node_data
except Exception as node_error:
log_provenance_error(node_error, "to_trace_json_node", prov_id=prov_id)
# Include error node
nodes[prov_id] = {
"id": str(prov_id),
"op": "error",
"inputs": [],
"meta": {"error": "node_conversion_failed"},
}
return {"root": str(root_prov.id), "nodes": nodes}
except Exception as e:
log_provenance_error(e, "to_trace_json")
if should_fail_on_error():
raise
return {"root": None, "nodes": {}, "error": "export_failed"}
def _validate_provenance_graph(graph: dict[str, Provenance]) -> bool:
"""Validate that a provenance graph is well-formed.
Args:
graph: Dictionary mapping provenance IDs to Provenance records
Returns:
True if the graph is valid, False otherwise
"""
if not graph:
return True
# Check that all referenced input IDs exist in the graph
# Note: In the current implementation, this will always pass
# because we only have single nodes, but this is useful for
# future enhancements when we have full graph traversal
for _, prov in graph.items():
for input_id in prov.inputs:
# For now, we just check that input_id is a valid string
if not isinstance(input_id, str) or not input_id:
return False
return True
def _format_provenance_summary(fv: FinancialValue) -> str:
"""Generate a brief summary of provenance information.
Args:
fv: FinancialValue to summarize
Returns:
Brief string summary of provenance
"""
if not fv.has_provenance():
return "No provenance"
prov = fv.get_provenance()
if not prov:
return "No provenance"
summary_parts = [f"Op: {prov.op}"]
if prov.inputs:
summary_parts.append(f"Inputs: {len(prov.inputs)}")
if prov.meta and "span" in prov.meta:
summary_parts.append(f"Span: {prov.meta['span']}")
return " | ".join(summary_parts)
[docs]
def explain(fv: FinancialValue, max_depth: int = 10) -> str:
"""Generate human-readable explanation of calculation.
This function creates a formatted text representation of how a FinancialValue
was calculated, showing the operation tree in a readable format. This is
useful for debugging and understanding complex calculations.
Args:
fv: FinancialValue to explain
max_depth: Maximum depth to traverse (prevents infinite recursion)
Returns:
Human-readable string explaining the calculation
Example:
>>> revenue = FinancialValue(1000)
>>> cost = FinancialValue(600)
>>> profit = revenue - cost
>>> print(explain(profit))
# Result (400.00):
# Operation: -
# Left: 1000.00 (literal)
# Right: 600.00 (literal)
"""
try:
# Safely get the value string
try:
value_str = fv.as_str()
except Exception as value_error:
log_provenance_error(value_error, "explain_value_str")
value_str = "error"
if not hasattr(fv, "has_provenance") or not fv.has_provenance():
return f"Value: {value_str} (no provenance available)"
root_prov = fv.get_provenance()
if not root_prov:
return f"Value: {value_str} (no provenance available)"
def _explain_node(prov: Provenance, depth: int = 0, prefix: str = "") -> str:
"""Recursively explain a provenance node."""
try:
if depth > max_depth:
return f"{prefix}... (max depth reached)"
indent = " " * depth
# Safely get operation type
try:
op = str(prov.op)
except Exception:
op = "unknown"
# Format the operation
if op == "literal":
# For literals, show the value if available in metadata
value_info = ""
try:
if prov.meta and "value" in prov.meta:
value_info = f" ({prov.meta['value']})"
except Exception:
pass
return f"{indent}{prefix}Literal{value_info}"
# For operations, show the operation type
result = f"{indent}{prefix}Operation: {op}"
# Add metadata information if available
try:
if prov.meta:
meta_info = []
# Safely extract metadata
for key, desc in [
("input_names", "inputs"),
("span", "span"),
("calculation", "calc"),
("conversion", "conversion"),
]:
try:
if key in prov.meta:
meta_info.append(f"{desc}: {prov.meta[key]}")
except Exception:
pass
if meta_info:
result += f" ({', '.join(meta_info)})"
except Exception as meta_error:
log_provenance_error(meta_error, "explain_node_meta", op=op)
# Add input information
try:
if prov.inputs:
result += f"\n{indent} Inputs: {len(prov.inputs)} operand(s)"
for i, input_id in enumerate(prov.inputs):
try:
input_str = str(input_id)
display_id = (
input_str[:8] + "..."
if len(input_str) > 8
else input_str
)
result += f"\n{indent} [{i}]: {display_id}"
except Exception:
result += f"\n{indent} [{i}]: error"
except Exception as inputs_error:
log_provenance_error(inputs_error, "explain_node_inputs", op=op)
return result
except Exception as node_error:
log_provenance_error(node_error, "explain_node", depth=depth)
return f"{prefix}Error explaining node at depth {depth}"
explanation = f"Value: {value_str}\n"
try:
explanation += _explain_node(root_prov)
except Exception as explain_error:
log_provenance_error(explain_error, "explain_root_node")
explanation += "Error explaining calculation tree"
return explanation
except Exception as e:
log_provenance_error(e, "explain")
if should_fail_on_error():
raise
return "Error generating explanation"