Source code for spxquery.processing.lightcurve

"""
Light curve generation and CSV export for SPHEREx time-domain analysis.
"""

import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List

import pandas as pd

from ..core.config import PhotometryResult, Source
from ..utils.spherex_mef import format_flag_binary

logger = logging.getLogger(__name__)


[docs] def generate_lightcurve_dataframe(photometry_results: List[PhotometryResult], source: Source) -> pd.DataFrame: """ Generate light curve DataFrame from photometry results. Parameters ---------- photometry_results : List[PhotometryResult] List of photometry measurements source : Source Source information Returns ------- pd.DataFrame Light curve data with all measurements """ if not photometry_results: logger.warning("No photometry results to generate light curve") return pd.DataFrame() # Convert to records records = [] for result in photometry_results: record = { "obs_id": result.obs_id, "mjd": result.mjd, "flux": result.flux, "flux_error": result.flux_error, "mag_ab": result.mag_ab, "mag_ab_error": result.mag_ab_error, "wavelength": result.wavelength, "bandwidth": result.bandwidth, "band": result.band, "flag": result.flag, "flag_binary": format_flag_binary(result.flag), "pix_x": result.pix_x, "pix_y": result.pix_y, "is_upper_limit": result.is_upper_limit, "snr": result.flux / result.flux_error if result.flux_error > 0 else 0, } records.append(record) # Create DataFrame df = pd.DataFrame(records) # Sort by MJD df = df.sort_values("mjd").reset_index(drop=True) # Add source information as attributes df.attrs["source_ra"] = source.ra df.attrs["source_dec"] = source.dec df.attrs["source_name"] = source.name or f"RA{source.ra:.4f}_Dec{source.dec:.4f}" df.attrs["generated_at"] = datetime.now().isoformat() logger.info(f"Generated light curve with {len(df)} measurements") return df
[docs] def save_lightcurve_csv(df: pd.DataFrame, output_path: Path, include_metadata: bool = True) -> None: """ Save light curve DataFrame to CSV file. Parameters ---------- df : pd.DataFrame Light curve data output_path : Path Path for output CSV file include_metadata : bool Whether to include metadata header """ output_path.parent.mkdir(parents=True, exist_ok=True) if include_metadata and df.attrs: # Write metadata as comments with open(output_path, "w") as f: f.write("# SPHEREx Light Curve Data\n") f.write(f"# Source: {df.attrs.get('source_name', 'Unknown')}\n") f.write(f"# RA: {df.attrs.get('source_ra', 'N/A')}\n") f.write(f"# Dec: {df.attrs.get('source_dec', 'N/A')}\n") f.write(f"# Generated: {df.attrs.get('generated_at', 'N/A')}\n") f.write(f"# Number of observations: {len(df)}\n") f.write("#\n") # Write column descriptions f.write("# Column descriptions:\n") f.write("# - obs_id: Observation identifier\n") f.write("# - mjd: Modified Julian Date\n") f.write("# - flux: Flux density in microJansky (μJy)\n") f.write("# - flux_error: Flux density uncertainty in microJansky (μJy)\n") f.write("# - mag_ab: AB magnitude\n") f.write("# - mag_ab_error: AB magnitude uncertainty\n") f.write("# - wavelength: Central wavelength in microns\n") f.write("# - bandwidth: Bandwidth in microns\n") f.write("# - band: SPHEREx detector band (D1-D6)\n") f.write("# - flag: Combined flag bitmap (integer)\n") f.write("# - flag_binary: Flag bitmap in binary format\n") f.write("# - pix_x, pix_y: Pixel coordinates\n") f.write("# - is_upper_limit: True if flux_error > flux\n") f.write("# - snr: Signal-to-noise ratio\n") f.write("#\n") # Write the data df.to_csv(f, index=False) else: # Simple CSV without metadata df.to_csv(output_path, index=False) logger.info(f"Saved light curve to {output_path}")
[docs] def summarize_lightcurve(df: pd.DataFrame) -> Dict[str, Any]: """ Generate summary statistics for light curve. Parameters ---------- df : pd.DataFrame Light curve data Returns ------- Dict[str, Any] Summary statistics """ if df.empty: return {} summary = { "n_observations": len(df), "n_bands": df["band"].nunique(), "bands": sorted(df["band"].unique()), "mjd_range": (df["mjd"].min(), df["mjd"].max()), "time_span_days": df["mjd"].max() - df["mjd"].min(), "wavelength_range": (df["wavelength"].min(), df["wavelength"].max()), "n_upper_limits": df["is_upper_limit"].sum(), "mean_snr": df["snr"].mean(), "observations_per_band": df["band"].value_counts().to_dict(), } return summary
[docs] def load_lightcurve_from_csv(csv_path: Path) -> List[PhotometryResult]: """ Load photometry results from saved lightcurve CSV file. Parameters ---------- csv_path : Path Path to the lightcurve CSV file Returns ------- List[PhotometryResult] List of photometry measurements """ if not csv_path.exists(): logger.warning(f"Lightcurve CSV file not found: {csv_path}") return [] try: # Read CSV file, skipping comment lines df = pd.read_csv(csv_path, comment="#") # Convert DataFrame back to PhotometryResult objects photometry_results = [] for _, row in df.iterrows(): # Handle AB magnitude columns (may be None/NaN for older CSV files) mag_ab = row.get("mag_ab") mag_ab = None if pd.isna(mag_ab) else float(mag_ab) mag_ab_error = row.get("mag_ab_error") mag_ab_error = None if pd.isna(mag_ab_error) else float(mag_ab_error) result = PhotometryResult( obs_id=row["obs_id"], mjd=float(row["mjd"]), flux=float(row["flux"]), flux_error=float(row["flux_error"]), wavelength=float(row["wavelength"]), bandwidth=float(row["bandwidth"]), band=str(row["band"]), flag=int(row["flag"]), pix_x=float(row["pix_x"]), pix_y=float(row["pix_y"]), mag_ab=mag_ab, mag_ab_error=mag_ab_error, ) # Note: is_upper_limit is a computed property (flux_error > flux), not a constructor parameter photometry_results.append(result) logger.info(f"Loaded {len(photometry_results)} photometry results from {csv_path}") return photometry_results except Exception as e: logger.error(f"Failed to load lightcurve from CSV: {e}") return []