"""
Background estimation for SPHEREx photometry.
This module provides both window-based and annulus-based background estimation
methods for local background subtraction in aperture photometry.
"""
import logging
from typing import Tuple, Union
import numpy as np
logger = logging.getLogger(__name__)
[docs]
def fast_sigma_clip(data: np.ndarray, sigma: float = 3.0, maxiters: int = 3):
"""Sigma-clipped stats using numpy directly (avoids astropy object overhead).
Parameters
----------
data : np.ndarray
Input data (will be flattened internally).
sigma : float
Clipping threshold in standard deviations.
maxiters : int
Maximum clipping iterations.
Returns
-------
mean, median, std : float, float, float
"""
clipped = data.ravel()
for _ in range(maxiters):
m = np.mean(clipped)
s = np.std(clipped)
if s == 0:
break
mask = np.abs(clipped - m) <= sigma * s
if mask.all():
break
clipped = clipped[mask]
return float(np.mean(clipped)), float(np.median(clipped)), float(np.std(clipped))
[docs]
def estimate_window_background(
image: np.ndarray,
variance: np.ndarray,
flags: np.ndarray,
x: float,
y: float,
window_size: Union[int, Tuple[int, int]],
aperture_radius: float,
min_usable_pixels: int = 10,
bg_sigma_clip_sigma: float = 3.0,
bg_sigma_clip_maxiters: int = 3,
) -> Tuple[float, float, int]:
"""
Estimate local background using a rectangular window around the source.
This method uses a rectangular region around the source with automatic
exclusion of pixels intersecting the aperture. It is simpler than
annulus-based methods and can be more robust in crowded fields.
Parameters
----------
image : np.ndarray
Image data
variance : np.ndarray
Variance array
flags : np.ndarray
Flag bitmap array
x, y : float
Source coordinates (center of window)
window_size : int or tuple of (height, width)
Size of the background window in pixels.
If int: creates square window of size window_size × window_size
If tuple: (height, width) for rectangular window
aperture_radius : float
Aperture radius in pixels. All pixels intersecting this aperture
are excluded from background estimation.
min_usable_pixels : int
Minimum number of unflagged pixels required
bg_sigma_clip_sigma : float
Sigma threshold for sigma clipping
bg_sigma_clip_maxiters : int
Maximum iterations for sigma clipping
Returns
-------
background_level : float
Background level per pixel
background_error : float
Background error per pixel
n_usable : int
Number of usable pixels in window
Notes
-----
- If window extends beyond image boundaries, it is clipped to image edges
with a warning (not an error)
- Pixels with flags set are excluded from background estimation
- Pixels intersecting the aperture are excluded (using conservative estimate)
- Sigma clipping is used to reject outliers before computing statistics
"""
# Parse window size
if isinstance(window_size, int):
win_height = win_width = window_size
else:
win_height, win_width = window_size
# Validate window size
if win_height <= 0 or win_width <= 0:
raise ValueError(f"window_size must be > 0, got {window_size}")
# Calculate window boundaries (half-widths from center)
half_height = win_height / 2.0
half_width = win_width / 2.0
# Window boundaries in pixel coordinates
y_min_float = y - half_height
y_max_float = y + half_height
x_min_float = x - half_width
x_max_float = x + half_width
# Convert to integer indices (0-based)
y_min = int(np.floor(y_min_float))
y_max = int(np.ceil(y_max_float))
x_min = int(np.floor(x_min_float))
x_max = int(np.ceil(x_max_float))
# Get image dimensions
ny, nx = image.shape
# Clip window to image boundaries
x_min_clipped = max(0, x_min)
x_max_clipped = min(nx, x_max)
y_min_clipped = max(0, y_min)
y_max_clipped = min(ny, y_max)
# Warn if window was clipped
if x_min_clipped != x_min or x_max_clipped != x_max or y_min_clipped != y_min or y_max_clipped != y_max:
logger.warning(
f"Background window [{x_min}:{x_max}, {y_min}:{y_max}] "
f"exceeds image boundaries, clipped to [{x_min_clipped}:{x_max_clipped}, {y_min_clipped}:{y_max_clipped}]"
)
# Extract window region
window_slice = (slice(y_min_clipped, y_max_clipped), slice(x_min_clipped, x_max_clipped))
image_window = image[window_slice]
variance_window = variance[window_slice]
flags_window = flags[window_slice]
# Create mask for usable pixels (unflagged)
from ..utils.spherex_mef import create_background_mask
clean_mask = create_background_mask(flags_window)
# Exclude pixels intersecting with aperture
# A pixel at (i,j) represents a square from [i-0.5, i+0.5] × [j-0.5, j+0.5]
# To ensure we exclude ALL pixels with any intersection, use conservative estimate:
# distance from aperture center to pixel center < aperture_radius + sqrt(2)/2
# where sqrt(2)/2 ≈ 0.707 is half the diagonal of a unit square pixel
yy, xx = np.ogrid[y_min_clipped:y_max_clipped, x_min_clipped:x_max_clipped]
distances = np.sqrt((xx - x) ** 2 + (yy - y) ** 2)
aperture_exclusion_mask = distances > (aperture_radius + 0.707)
# Combine masks: clean pixels AND outside aperture
usable_mask = clean_mask & aperture_exclusion_mask
n_usable = np.sum(usable_mask)
# Check if we have enough pixels
if n_usable < min_usable_pixels:
logger.warning(
f"Insufficient usable pixels with strict masking: {n_usable}/{min_usable_pixels} required. "
f"Trying with relaxed masking (exclude_bad_only=False)."
)
# Try with relaxed masking
clean_mask = create_background_mask(flags_window, False)
usable_mask = clean_mask & aperture_exclusion_mask
n_usable = np.sum(usable_mask)
if n_usable < min_usable_pixels:
logger.error(
f"Insufficient usable pixels in background window even with relaxed masking: "
f"{n_usable}/{min_usable_pixels} required "
f"(window: {win_height}×{win_width}, aperture_radius: {aperture_radius:.1f})"
)
return 0.0, 0.0, 0
# Extract background pixels
bg_pixels = image_window[usable_mask]
bg_variance = variance_window[usable_mask]
# Calculate background statistics using sigma-clipped mean
bg_mean, bg_median, bg_std = fast_sigma_clip(bg_pixels, sigma=bg_sigma_clip_sigma, maxiters=bg_sigma_clip_maxiters)
# Error on the mean background
bg_error = bg_std / np.sqrt(n_usable)
logger.debug(
f"Window background estimate: {bg_median:.6f} ± {bg_error:.6f} from {n_usable} pixels "
f"(window: {win_height}×{win_width}, aperture_radius: {aperture_radius:.1f})"
)
return float(bg_median), float(bg_error), n_usable
[docs]
def determine_annulus_radii(
aperture_radius: float,
min_annulus_area_pixels: int = 10,
max_outer_radius: float = 5.0,
annulus_inner_offset: float = 1.414,
) -> Tuple[float, float]:
"""
Determine inner and outer radii for background annulus.
Inner and outer radii are calculated automatically based on aperture size
and configuration parameters.
Parameters
----------
aperture_radius : float
Source aperture radius in pixels
min_annulus_area_pixels : int
Minimum annulus area in pixels
max_outer_radius : float
Maximum allowed outer radius
annulus_inner_offset : float
Offset from aperture edge to inner annulus radius
Returns
-------
inner_radius, outer_radius : float, float
Annulus inner and outer radii in pixels
Notes
-----
Inner radius is calculated as: aperture_radius + annulus_inner_offset
Outer radius is calculated to achieve minimum annulus area, capped at max_outer_radius.
"""
# Calculate inner radius (offset pixels larger than aperture radius)
inner_radius = aperture_radius + annulus_inner_offset
# Calculate outer radius to achieve minimum annulus area
# Area of annulus = π(r_out² - r_in²)
# Solve for r_out: r_out = sqrt(area/π + r_in²)
target_outer_radius = np.sqrt(min_annulus_area_pixels / np.pi + inner_radius**2)
outer_radius = min(target_outer_radius, max_outer_radius)
logger.debug(f"Annulus radii: inner={inner_radius:.2f}, outer={outer_radius:.2f}")
return inner_radius, outer_radius
[docs]
def create_annulus_mask(
image_shape: Tuple[int, int], x: float, y: float, inner_radius: float, outer_radius: float
) -> np.ndarray:
"""
Create boolean mask for annular region.
Parameters
----------
image_shape : tuple
Shape of image (ny, nx)
x, y : float
Center coordinates
inner_radius, outer_radius : float
Annulus radii in pixels
Returns
-------
np.ndarray
Boolean mask (True = within annulus)
"""
ny, nx = image_shape
yy, xx = np.ogrid[:ny, :nx]
# Distance from center
distances = np.sqrt((xx - x) ** 2 + (yy - y) ** 2)
# Annulus mask (between inner and outer radii)
mask = (distances >= inner_radius) & (distances <= outer_radius)
return mask
[docs]
def estimate_local_background(
image: np.ndarray,
variance: np.ndarray,
flags: np.ndarray,
x: float,
y: float,
aperture_radius: float,
min_usable_pixels: int = 10,
max_outer_radius: float = 5.0,
bg_sigma_clip_sigma: float = 3.0,
bg_sigma_clip_maxiters: int = 3,
max_annulus_attempts: int = 5,
annulus_expansion_step: float = 0.5,
annulus_inner_offset: float = 1.414,
) -> Tuple[float, float, int]:
"""
Estimate local background using annular region around source.
Annulus radii are calculated automatically based on aperture size and
configuration parameters.
Parameters
----------
image : np.ndarray
Image data
variance : np.ndarray
Variance array
flags : np.ndarray
Flag array
x, y : float
Source coordinates
aperture_radius : float
Source aperture radius
min_usable_pixels : int
Minimum number of usable pixels required
max_outer_radius : float
Maximum outer radius
bg_sigma_clip_sigma : float
Sigma threshold for sigma clipping
bg_sigma_clip_maxiters : int
Maximum iterations for sigma clipping
max_annulus_attempts : int
Maximum attempts to expand annulus
annulus_expansion_step : float
Step size for annulus expansion
annulus_inner_offset : float
Offset from aperture edge to inner annulus
Returns
-------
background_level : float
Background level per pixel (MJy/sr)
background_error : float
Background error per pixel (MJy/sr)
n_usable : int
Number of usable pixels in annulus
Notes
-----
Inner radius = aperture_radius + annulus_inner_offset
Outer radius is calculated to achieve minimum annulus area, capped at max_outer_radius.
"""
from ..utils.spherex_mef import create_background_mask
# Determine annulus radii automatically
inner_r, outer_r = determine_annulus_radii(
aperture_radius, min_usable_pixels, max_outer_radius, annulus_inner_offset
)
# Check if annulus fits within image
ny, nx = image.shape
max_distance = min(x, y, nx - x, ny - y)
if outer_r > max_distance:
logger.warning(f"Outer radius {outer_r:.2f} exceeds image boundary {max_distance:.2f}")
outer_r = max_distance
# If outer radius was reduced, inner radius might need adjustment too
if inner_r >= outer_r:
inner_r = outer_r * 0.7 # Make inner radius 70% of outer radius
# Try progressively larger outer radii until we get enough usable pixels
attempt = 0
while attempt < max_annulus_attempts:
# Create annulus mask
annulus_mask = create_annulus_mask(image.shape, x, y, inner_r, outer_r)
# Create clean background mask (no flagged pixels)
clean_mask = create_background_mask(flags)
# Combine masks
usable_mask = annulus_mask & clean_mask
n_usable = np.sum(usable_mask)
logger.debug(f"Attempt {attempt + 1}: annulus area={np.sum(annulus_mask)}, usable={n_usable}")
# Check if we have enough usable pixels
if n_usable >= min_usable_pixels:
break
# If this is the last attempt and we still don't have enough pixels, try relaxed masking
if attempt == max_annulus_attempts - 1:
logger.warning(
f"Insufficient usable pixels with strict masking: {n_usable}/{min_usable_pixels} required. "
f"Trying with relaxed masking (exclude_bad_only=False)."
)
clean_mask = create_background_mask(flags, False)
usable_mask = annulus_mask & clean_mask
n_usable = np.sum(usable_mask)
if n_usable >= min_usable_pixels:
break
# Expand outer radius if possible
if outer_r < max_outer_radius and outer_r < max_distance:
new_outer_r = min(outer_r + annulus_expansion_step, max_outer_radius, max_distance)
if new_outer_r > outer_r:
outer_r = new_outer_r
attempt += 1
continue
# Cannot expand further
break
# Extract background pixels
if n_usable == 0:
logger.error("No usable pixels in background annulus")
return 0.0, 0.0, 0
bg_pixels = image[usable_mask]
bg_variance = variance[usable_mask]
# Calculate background statistics using sigma-clipped mean
bg_mean, bg_median, bg_std = fast_sigma_clip(bg_pixels, sigma=bg_sigma_clip_sigma, maxiters=bg_sigma_clip_maxiters)
# Error on the mean background
bg_error = bg_std / np.sqrt(n_usable)
logger.debug(
f"Background estimate: {bg_median:.6f} ± {bg_error:.6f} MJy/sr "
f"from {n_usable} pixels (r={inner_r:.1f}-{outer_r:.1f})"
)
return float(bg_median), float(bg_error), n_usable