# ****************************************************************************** # @copyright (C) 2026 Zara-Toorox - Solar Forecast Stats x86 DB-Version part of Solar Forecast ML DB # * This program is protected by a Proprietary Non-Commercial License. # 1. Personal and Educational use only. # 2. COMMERCIAL USE AND AI TRAINING ARE STRICTLY PROHIBITED. # 3. Clear attribution to "Zara-Toorox" is required. # * Full license terms: https://github.com/Zara-Toorox/ha-solar-forecast-ml/blob/main/LICENSE # ****************************************************************************** """Annual yield forecast reader for SFML Stats. @zara""" from __future__ import annotations import logging import math from contextlib import asynccontextmanager from dataclasses import dataclass, field from datetime import date, datetime, timedelta from pathlib import Path from typing import Any, AsyncIterator, TYPE_CHECKING import aiosqlite from ..const import SOLAR_FORECAST_DB if TYPE_CHECKING: from ..storage.db_connection_manager import DatabaseConnectionManager _LOGGER = logging.getLogger(__name__) # Berlin TMY clear-sky index per month (ratio of actual/clear-sky irradiation). # Derived from DWD TRY 2015 reference year for Central European lowlands. # Used to extrapolate months without measured data. _TMY_CLEARSKY_INDEX = { 1: 0.28, 2: 0.34, 3: 0.42, 4: 0.50, 5: 0.52, 6: 0.53, 7: 0.51, 8: 0.49, 9: 0.44, 10: 0.35, 11: 0.26, 12: 0.23, } # Approximate daylight hours per month at 52°N for physics baseline scaling. _DAYLIGHT_HOURS_52N = { 1: 8.3, 2: 9.8, 3: 11.8, 4: 13.9, 5: 15.6, 6: 16.6, 7: 16.1, 8: 14.6, 9: 12.7, 10: 10.7, 11: 8.8, 12: 7.9, } @dataclass class MonthlyYield: """Monthly yield summary with measured and projected values. @zara""" year: int month: int measured_days: int total_days_in_month: int # Measured values (from actual data) measured_yield_kwh: float measured_avg_daily_kwh: float measured_min_day_kwh: float measured_max_day_kwh: float measured_avg_peak_w: float measured_avg_production_hours: float measured_avg_accuracy: float # Projected values (full month estimate) projected_yield_kwh: float source: str # "measured", "partial", "estimated" # Self-consumption data (if available) self_consumption_kwh: float = 0.0 grid_export_kwh: float = 0.0 avg_autarky_percent: float = 0.0 @dataclass class CalibrationData: """Panel group calibration state. @zara""" group_name: str global_factor: float sample_count: int confidence: float @dataclass class AnnualForecast: """Complete annual yield forecast result. @zara""" # System info latitude: float longitude: float installed_kwp: float panel_groups: list[CalibrationData] # Data coverage first_data_date: date last_data_date: date total_measured_days: int # Monthly breakdown months: list[MonthlyYield] # Aggregated annual values annual_yield_kwh: float specific_yield_kwh_per_kwp: float performance_ratio: float best_month: int worst_month: int # Confidence measured_months: int estimated_months: int confidence_percent: float # Yield bands optimistic_yield_kwh: float pessimistic_yield_kwh: float # Records record_day_kwh: float record_day_date: str record_peak_w: float class AnnualForecastReader: """Reads historical data and computes an annual yield forecast. @zara""" _db_manager: DatabaseConnectionManager | None = None def __init__( self, config_path: Path, db_manager: DatabaseConnectionManager | None = None, ) -> None: """Initialize the annual forecast reader. @zara""" self._config_path = config_path self._db_path = config_path / SOLAR_FORECAST_DB if db_manager is not None: AnnualForecastReader._db_manager = db_manager @property def is_available(self) -> bool: """Check if database is accessible. @zara""" if self._db_manager is not None: return self._db_manager.is_available return self._db_path.exists() @asynccontextmanager async def _get_db_connection(self) -> AsyncIterator[aiosqlite.Connection]: """Get a database connection from the manager. @zara""" from ..storage.db_connection_manager import get_manager manager = get_manager() if manager is not None and manager.is_connected: yield await manager.get_connection() else: conn = await aiosqlite.connect(str(self._db_path)) conn.row_factory = aiosqlite.Row try: yield conn finally: await conn.close() async def async_get_annual_forecast(self) -> AnnualForecast | None: """Compute a full annual yield forecast from historical data. @zara""" if not self.is_available: _LOGGER.debug("Database not found: %s", self._db_path) return None try: async with self._get_db_connection() as conn: system_info = await self._get_system_info(conn) calibration = await self._get_calibration_data(conn) monthly_data = await self._get_monthly_summaries(conn) energy_data = await self._get_monthly_energy(conn) records = await self._get_records(conn) data_range = await self._get_data_range(conn) if not system_info: _LOGGER.warning("No system info found in database") return None latitude = system_info["latitude"] installed_kwp = system_info["installed_kwp"] # Build 12-month projection months = self._build_monthly_projection( monthly_data, energy_data, installed_kwp, latitude ) annual_yield = sum(m.projected_yield_kwh for m in months) specific_yield = annual_yield / installed_kwp if installed_kwp > 0 else 0 # Theoretical maximum (clear sky, no losses) theoretical_max = self._compute_theoretical_annual(installed_kwp, latitude) performance_ratio = annual_yield / theoretical_max if theoretical_max > 0 else 0 best_month = max(months, key=lambda m: m.projected_yield_kwh).month worst_month = min(months, key=lambda m: m.projected_yield_kwh).month measured_months = sum(1 for m in months if m.source == "measured") partial_months = sum(1 for m in months if m.source == "partial") estimated_months = sum(1 for m in months if m.source == "estimated") # Confidence based on data coverage confidence = (measured_months * 100 + partial_months * 60) / 12 # Yield bands from daily variance optimistic, pessimistic = self._compute_yield_bands( months, annual_yield ) return AnnualForecast( latitude=latitude, longitude=system_info["longitude"], installed_kwp=installed_kwp, panel_groups=calibration, first_data_date=data_range["first"], last_data_date=data_range["last"], total_measured_days=data_range["count"], months=months, annual_yield_kwh=round(annual_yield, 1), specific_yield_kwh_per_kwp=round(specific_yield, 0), performance_ratio=round(performance_ratio * 100, 1), best_month=best_month, worst_month=worst_month, measured_months=measured_months, estimated_months=estimated_months, confidence_percent=round(confidence, 1), optimistic_yield_kwh=round(optimistic, 1), pessimistic_yield_kwh=round(pessimistic, 1), record_day_kwh=records.get("max_kwh", 0), record_day_date=records.get("max_date", ""), record_peak_w=records.get("peak_w", 0), ) except Exception as err: _LOGGER.error("Error computing annual forecast: %s", err) return None # ------------------------------------------------------------------------- # Database queries # ------------------------------------------------------------------------- async def _get_system_info(self, conn: aiosqlite.Connection) -> dict | None: """Read system info from astronomy_system_info. @zara""" try: async with conn.execute( """SELECT latitude, longitude, installed_capacity_kwp, max_peak_record_kwh, max_peak_date, max_peak_hour FROM astronomy_system_info WHERE id = 1""" ) as cursor: row = await cursor.fetchone() if row: return { "latitude": row["latitude"], "longitude": row["longitude"], "installed_kwp": row["installed_capacity_kwp"] or 0, } except Exception as err: _LOGGER.debug("Error reading system info: %s", err) return None async def _get_calibration_data( self, conn: aiosqlite.Connection ) -> list[CalibrationData]: """Read panel group calibration factors. @zara""" result: list[CalibrationData] = [] try: async with conn.execute( """SELECT group_name, global_factor, sample_count, confidence FROM physics_calibration_groups""" ) as cursor: rows = await cursor.fetchall() for row in rows: result.append(CalibrationData( group_name=row["group_name"], global_factor=row["global_factor"], sample_count=row["sample_count"], confidence=row["confidence"], )) except Exception as err: _LOGGER.debug("Error reading calibration data: %s", err) return result async def _get_monthly_summaries( self, conn: aiosqlite.Connection ) -> dict[str, dict]: """Aggregate daily_summaries by year-month. @zara""" result: dict[str, dict] = {} try: async with conn.execute( """SELECT strftime('%Y-%m', date) AS ym, COUNT(*) AS days, ROUND(AVG(actual_total_kwh), 3) AS avg_daily, ROUND(SUM(actual_total_kwh), 3) AS total_kwh, ROUND(MIN(actual_total_kwh), 3) AS min_day, ROUND(MAX(actual_total_kwh), 3) AS max_day, ROUND(AVG(peak_power_w), 0) AS avg_peak_w, ROUND(AVG(production_hours), 1) AS avg_prod_hours, ROUND(AVG(accuracy_percent), 1) AS avg_accuracy FROM daily_summaries WHERE actual_total_kwh > 0 GROUP BY strftime('%Y-%m', date) ORDER BY ym""" ) as cursor: rows = await cursor.fetchall() for row in rows: result[row["ym"]] = { "days": row["days"], "avg_daily": row["avg_daily"] or 0, "total_kwh": row["total_kwh"] or 0, "min_day": row["min_day"] or 0, "max_day": row["max_day"] or 0, "avg_peak_w": row["avg_peak_w"] or 0, "avg_prod_hours": row["avg_prod_hours"] or 0, "avg_accuracy": row["avg_accuracy"] or 0, } except Exception as err: _LOGGER.debug("Error reading monthly summaries: %s", err) return result async def _get_monthly_energy( self, conn: aiosqlite.Connection ) -> dict[str, dict]: """Aggregate energy stats by year-month. @zara""" result: dict[str, dict] = {} try: async with conn.execute( """SELECT strftime('%Y-%m', date) AS ym, ROUND(SUM(self_consumption_kwh), 2) AS self_consumed, ROUND(SUM(grid_export_kwh), 2) AS exported, ROUND(AVG(autarkie_percent), 1) AS avg_autarky FROM stats_daily_energy WHERE solar_yield_kwh > 0 GROUP BY strftime('%Y-%m', date) ORDER BY ym""" ) as cursor: rows = await cursor.fetchall() for row in rows: result[row["ym"]] = { "self_consumed": row["self_consumed"] or 0, "exported": row["exported"] or 0, "avg_autarky": row["avg_autarky"] or 0, } except Exception as err: _LOGGER.debug("Error reading monthly energy: %s", err) return result async def _get_records(self, conn: aiosqlite.Connection) -> dict: """Get all-time records. @zara""" result: dict[str, Any] = {} try: async with conn.execute( """SELECT date, actual_total_kwh, peak_power_w FROM daily_summaries WHERE actual_total_kwh > 0 ORDER BY actual_total_kwh DESC LIMIT 1""" ) as cursor: row = await cursor.fetchone() if row: result["max_kwh"] = row["actual_total_kwh"] result["max_date"] = row["date"] async with conn.execute( """SELECT max_peak_record_kwh FROM astronomy_system_info WHERE id = 1""" ) as cursor: row = await cursor.fetchone() if row: result["peak_w"] = row["max_peak_record_kwh"] or 0 except Exception as err: _LOGGER.debug("Error reading records: %s", err) return result async def _get_data_range(self, conn: aiosqlite.Connection) -> dict: """Get first/last date and total days with data. @zara""" try: async with conn.execute( """SELECT MIN(date) AS first_date, MAX(date) AS last_date, COUNT(*) AS total_days FROM daily_summaries WHERE actual_total_kwh > 0""" ) as cursor: row = await cursor.fetchone() if row and row["first_date"]: return { "first": date.fromisoformat(row["first_date"]), "last": date.fromisoformat(row["last_date"]), "count": row["total_days"], } except Exception as err: _LOGGER.debug("Error reading data range: %s", err) return {"first": date.today(), "last": date.today(), "count": 0} # ------------------------------------------------------------------------- # Projection logic # ------------------------------------------------------------------------- def _build_monthly_projection( self, monthly_data: dict[str, dict], energy_data: dict[str, dict], installed_kwp: float, latitude: float, ) -> list[MonthlyYield]: """Build a 12-month projection mixing measured and estimated data. @zara""" import calendar today = date.today() current_year = today.year months: list[MonthlyYield] = [] for month_num in range(1, 13): days_in_month = calendar.monthrange(current_year, month_num)[1] # Find the best matching data for this calendar month measured = self._find_best_month_data(monthly_data, month_num) energy = self._find_best_month_energy(energy_data, month_num) if measured and measured["days"] >= days_in_month * 0.8: # Full month of data (80%+ coverage) — use as-is projected = measured["total_kwh"] # Scale to full month if slightly incomplete if measured["days"] < days_in_month: projected = measured["avg_daily"] * days_in_month source = "measured" elif measured and measured["days"] >= 5: # Partial month — extrapolate from daily average projected = measured["avg_daily"] * days_in_month source = "partial" else: # No data — estimate from physics + climate projected = self._estimate_month( month_num, installed_kwp, latitude ) measured = { "days": 0, "avg_daily": projected / days_in_month, "total_kwh": 0, "min_day": 0, "max_day": 0, "avg_peak_w": 0, "avg_prod_hours": 0, "avg_accuracy": 0, } source = "estimated" months.append(MonthlyYield( year=current_year, month=month_num, measured_days=measured["days"], total_days_in_month=days_in_month, measured_yield_kwh=round(measured["total_kwh"], 2), measured_avg_daily_kwh=round(measured["avg_daily"], 2), measured_min_day_kwh=round(measured["min_day"], 2), measured_max_day_kwh=round(measured["max_day"], 2), measured_avg_peak_w=round(measured["avg_peak_w"], 0), measured_avg_production_hours=round(measured["avg_prod_hours"], 1), measured_avg_accuracy=round(measured["avg_accuracy"], 1), projected_yield_kwh=round(projected, 2), source=source, self_consumption_kwh=round(energy.get("self_consumed", 0), 2) if energy else 0, grid_export_kwh=round(energy.get("exported", 0), 2) if energy else 0, avg_autarky_percent=round(energy.get("avg_autarky", 0), 1) if energy else 0, )) return months def _find_best_month_data( self, monthly_data: dict[str, dict], target_month: int ) -> dict | None: """Find the best available data for a given calendar month. @zara""" # Prefer most recent year with data for this month candidates = [] for ym, data in monthly_data.items(): try: parts = ym.split("-") if int(parts[1]) == target_month: candidates.append((int(parts[0]), data)) except (ValueError, IndexError): continue if not candidates: return None # Return data from the most recent year candidates.sort(key=lambda x: x[0], reverse=True) return candidates[0][1] def _find_best_month_energy( self, energy_data: dict[str, dict], target_month: int ) -> dict | None: """Find energy data for a given calendar month. @zara""" candidates = [] for ym, data in energy_data.items(): try: parts = ym.split("-") if int(parts[1]) == target_month: candidates.append((int(parts[0]), data)) except (ValueError, IndexError): continue if not candidates: return None candidates.sort(key=lambda x: x[0], reverse=True) return candidates[0][1] def _estimate_month( self, month: int, installed_kwp: float, latitude: float ) -> float: """Estimate monthly yield from physics baseline and climate data. @zara""" import calendar days = calendar.monthrange(date.today().year, month)[1] daylight = _DAYLIGHT_HOURS_52N.get(month, 12.0) # Adjust daylight for actual latitude (simple linear scaling) lat_factor = 1.0 + (latitude - 52.0) * 0.01 # Peak sun hours = daylight * clear-sky index * atmospheric losses clearsky_idx = _TMY_CLEARSKY_INDEX.get(month, 0.40) peak_sun_hours = daylight * clearsky_idx * 0.75 * lat_factor # Monthly yield = kWp * PSH * days * system_efficiency system_efficiency = 0.82 monthly_kwh = installed_kwp * peak_sun_hours * days * system_efficiency return monthly_kwh def _compute_theoretical_annual( self, installed_kwp: float, latitude: float ) -> float: """Compute theoretical annual clear-sky yield. @zara""" import calendar total = 0.0 year = date.today().year for month in range(1, 13): days = calendar.monthrange(year, month)[1] daylight = _DAYLIGHT_HOURS_52N.get(month, 12.0) lat_factor = 1.0 + (latitude - 52.0) * 0.01 # Clear-sky: no cloud losses, only basic atmospheric + system total += installed_kwp * daylight * 0.75 * lat_factor * days * 0.82 return total def _compute_yield_bands( self, months: list[MonthlyYield], expected: float ) -> tuple[float, float]: """Compute optimistic and pessimistic yield bands. @zara""" # Use measured variance where available measured = [m for m in months if m.source != "estimated"] if not measured: return expected * 1.15, expected * 0.85 # Compute coefficient of variation from daily averages daily_values = [m.measured_avg_daily_kwh for m in measured if m.measured_avg_daily_kwh > 0] if len(daily_values) < 3: return expected * 1.15, expected * 0.85 mean_val = sum(daily_values) / len(daily_values) variance = sum((v - mean_val) ** 2 for v in daily_values) / len(daily_values) cv = math.sqrt(variance) / mean_val if mean_val > 0 else 0.15 # Bound the variation to a reasonable range spread = min(max(cv * 0.5, 0.08), 0.20) return expected * (1 + spread), expected * (1 - spread)