Source code for metricengine.reductions

from __future__ import annotations

from collections.abc import Iterable, Sequence

from .exceptions import CalculationError
from .null_behaviour import NullReductionMode, get_nulls
from .policy import Policy
from .policy_context import PolicyResolution, get_policy, use_policy_resolution
from .units import Dimensionless
from .utils import SupportsDecimal
from .value import FinancialValue as FV


def _active_policy(fallback: Policy | None = None) -> Policy:
    """
    Return a non-None policy: prefer ambient, else provided fallback, else DEFAULT_POLICY.
    Late-import DEFAULT_POLICY to avoid circulars at package import time.
    """
    pol = get_policy()
    if pol is not None:
        return pol
    if fallback is not None:
        return fallback
    from .policy import DEFAULT_POLICY  # late import

    return DEFAULT_POLICY


def _is_noneish(x: FV | None | object) -> bool:
    return (x is None) or (isinstance(x, FV) and x.is_none())


def _pick_policy_for_items(
    items: Iterable[SupportsDecimal],
    explicit_policy: Policy | None = None,
) -> Policy:
    """
    Choose a concrete Policy:
      1) explicit_policy (if provided) - takes precedence
      2) First non-None FV's policy
      3) Else active/ambient (or DEFAULT)
    """
    # Explicit policy takes precedence
    if explicit_policy is not None:
        return explicit_policy

    for x in items:
        if isinstance(x, FV) and not x.is_none():
            return x.policy
        if (x is not None) and not isinstance(x, FV):
            # first primitive value: use active/default
            return _active_policy()
    return _active_policy()


def _pick_unit_for_items(items: Iterable[SupportsDecimal]) -> type:
    """First non-None FV's unit; otherwise Dimensionless."""
    for x in items:
        if isinstance(x, FV) and not x.is_none():
            return x.unit
    return Dimensionless


def _to_fv(x: SupportsDecimal, *, policy: Policy, unit: type) -> FV:
    """Coerce primitive or FV to FV with the chosen policy/unit."""
    return x if isinstance(x, FV) else FV(x, policy=policy, unit=unit)


[docs] def fv_sum( items: Sequence[SupportsDecimal], *, mode: NullReductionMode | None = None, policy: Policy | None = None, ) -> FV: mode = mode or get_nulls().reduction seq = list(items) result_policy = _pick_policy_for_items(seq, explicit_policy=policy) result_unit = _pick_unit_for_items(seq) total = FV.zero(result_policy, unit=result_unit) saw_none = False saw_value = False # Preserve accumulator's policy with use_policy_resolution(PolicyResolution.LEFT_OPERAND): for x in seq: if _is_noneish(x): saw_none = True if mode is NullReductionMode.RAISE: raise CalculationError("Reduction encountered None") continue total = total + _to_fv(x, policy=result_policy, unit=result_unit) saw_value = True if mode is NullReductionMode.PROPAGATE and saw_none: return FV.none(result_policy) if mode is NullReductionMode.SKIP: return total if saw_value else FV.none(result_policy) if mode is NullReductionMode.ZERO: # sum([] of None) ⇒ 0 per your test note return total # Fallback behaves like SKIP return total if saw_value else FV.none(result_policy)
[docs] def fv_mean( items: Iterable[SupportsDecimal], *, mode: NullReductionMode | None = None, policy: Policy | None = None, ) -> FV: mode = mode or get_nulls().reduction seq = list(items) result_policy = _pick_policy_for_items(seq, explicit_policy=policy) result_unit = _pick_unit_for_items(seq) if mode is NullReductionMode.RAISE: if any(_is_noneish(x) for x in seq): raise CalculationError("Reduction encountered None") if mode is NullReductionMode.PROPAGATE: if any(_is_noneish(x) for x in seq): return FV.none(result_policy) n = len(seq) if n == 0: return FV.none(result_policy) s = fv_sum(seq, mode=NullReductionMode.SKIP, policy=result_policy) with use_policy_resolution(PolicyResolution.LEFT_OPERAND): return s / FV(n, policy=result_policy, unit=Dimensionless) if mode is NullReductionMode.SKIP: non_nulls: list[FV] = [ _to_fv(x, policy=result_policy, unit=result_unit) for x in seq if not _is_noneish(x) ] if not non_nulls: return FV.none(result_policy) s = fv_sum(non_nulls, mode=NullReductionMode.SKIP, policy=result_policy) if s == 0: return FV.zero(s.policy, unit=result_unit) with use_policy_resolution(PolicyResolution.LEFT_OPERAND): return s / FV(len(non_nulls), policy=result_policy, unit=Dimensionless) if mode is NullReductionMode.ZERO: n = len(seq) if n == 0: return FV.none(result_policy) if all(_is_noneish(x) for x in seq): # explicit test expectation return FV.none(result_policy) s = fv_sum(seq, mode=NullReductionMode.ZERO, policy=result_policy) with use_policy_resolution(PolicyResolution.LEFT_OPERAND): return s / FV(n, policy=result_policy, unit=Dimensionless) # Fallback → SKIP behaviour non_nulls: list[FV] = [ _to_fv(x, policy=result_policy, unit=result_unit) for x in seq if not _is_noneish(x) ] if not non_nulls: return FV.none(result_policy) s = fv_sum(non_nulls, mode=NullReductionMode.SKIP, policy=result_policy) with use_policy_resolution(PolicyResolution.LEFT_OPERAND): return s / FV(len(non_nulls), policy=result_policy, unit=Dimensionless)
[docs] def fv_weighted_mean( items: Iterable[tuple[SupportsDecimal, SupportsDecimal]], *, mode: NullReductionMode | None = None, policy: Policy | None = None, ) -> FV: """ Weighted mean of values (v_i) with weights (w_i): sum(v_i * w_i) / sum(w_i). ZERO mode semantics: - Treat value None as 0 (weight still counts). - Treat weight None as 0 (pair contributes nothing). SKIP mode semantics: - Skip pairs where value or weight is None. PROPAGATE mode semantics: - Any None → propagate None (handled early). RAISE mode semantics: - Any None → raise (handled early). """ mode = mode or get_nulls().reduction seq = list(items) values = [v for v, _ in seq] weights = [w for _, w in seq] result_policy = _pick_policy_for_items(values + weights, explicit_policy=policy) result_unit = _pick_unit_for_items(values) if mode is NullReductionMode.RAISE: if any(_is_noneish(x) for x in values + weights): raise CalculationError("Reduction encountered None") if mode is NullReductionMode.PROPAGATE: if any(_is_noneish(x) for x in values + weights): return FV.none(result_policy) if not seq: return FV.none(result_policy) valid_pairs: list[tuple[FV, FV]] = [] for val, weight in seq: v_is_none = _is_noneish(val) w_is_none = _is_noneish(weight) if mode is NullReductionMode.SKIP: if v_is_none or w_is_none: continue v_fv = _to_fv(val, policy=result_policy, unit=result_unit) w_fv = _to_fv(weight, policy=result_policy, unit=Dimensionless) valid_pairs.append((v_fv, w_fv)) continue if mode is NullReductionMode.ZERO: # value None → treat as 0 with weight included v_fv = ( FV.zero(result_policy, unit=result_unit) if v_is_none else _to_fv(val, policy=result_policy, unit=result_unit) ) # weight None → treat as 0 (drops pair contribution) w_fv = ( FV.zero(result_policy, unit=Dimensionless) if w_is_none else _to_fv(weight, policy=result_policy, unit=Dimensionless) ) # If weight becomes zero, this pair won't affect numerator nor denominator valid_pairs.append((v_fv, w_fv)) continue # Fallback (SKIP-like): keep only fully valid pairs if not v_is_none and not w_is_none: v_fv = _to_fv(val, policy=result_policy, unit=result_unit) w_fv = _to_fv(weight, policy=result_policy, unit=Dimensionless) valid_pairs.append((v_fv, w_fv)) if not valid_pairs: # In ZERO mode an all-None set ⇒ 0/0 → None by convention return FV.none(result_policy) weighted_sum = FV.zero(result_policy, unit=result_unit) total_weight = FV.zero(result_policy, unit=Dimensionless) with use_policy_resolution(PolicyResolution.LEFT_OPERAND): for v, w in valid_pairs: weighted_sum = weighted_sum + (v * w) total_weight = total_weight + w if total_weight == 0: return FV.none(result_policy) return weighted_sum / total_weight