Source code for spxquery.core.download

"""
Parallel download manager for SPHEREx FITS files.
"""

import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple
from urllib.request import Request, urlopen

from tqdm import tqdm

from .config import DownloadResult, ObservationInfo

if TYPE_CHECKING:
    from .config import DownloadConfig

logger = logging.getLogger(__name__)


[docs] def download_file( url: str, output_path: Path, timeout: int = 300, retries: int = 3, retry_delay: int = 5, chunk_size: int = 8192, user_agent: str = "SPXQuery/0.4.1", ) -> DownloadResult: """ Download a single file with retry logic. Parameters ---------- url : str URL to download output_path : Path Local path to save file timeout : int Download timeout in seconds (default: 300) retries : int Number of retry attempts (default: 3) retry_delay : int Delay between retry attempts in seconds (default: 5) chunk_size : int Download chunk size in bytes (default: 8192) user_agent : str User agent string for HTTP requests (default: "SPXQuery/0.4.1") Returns ------- DownloadResult Result of download attempt """ attempt = 0 last_error = None while attempt <= retries: try: # Create output directory if needed output_path.parent.mkdir(parents=True, exist_ok=True) # Set up request with headers request = Request(url, headers={"User-Agent": user_agent}) # Open connection with urlopen(request, timeout=timeout) as response: # Get file size if available total_size = int(response.headers.get("Content-Length", 0)) # Download file with open(output_path, "wb") as f: downloaded = 0 while True: chunk = response.read(chunk_size) if not chunk: break f.write(chunk) downloaded += len(chunk) # Verify download actual_size = output_path.stat().st_size if total_size > 0 and actual_size != total_size: raise RuntimeError(f"Download incomplete: expected {total_size} bytes, got {actual_size}") return DownloadResult(url=url, local_path=output_path, success=True, size_mb=actual_size / 1024 / 1024) except Exception as e: last_error = str(e) logger.warning(f"Download attempt {attempt + 1} failed for {url}: {e}") # Remove partial file if output_path.exists(): output_path.unlink() attempt += 1 if attempt <= retries: time.sleep(retry_delay) # All attempts failed return DownloadResult( url=url, local_path=output_path, success=False, error=f"Failed after {retries + 1} attempts: {last_error}" )
[docs] def parallel_download( download_info: List[Tuple[ObservationInfo, str]], output_dir: Path, max_workers: int = 4, show_progress: bool = True, skip_existing: bool = True, download_config: Optional["DownloadConfig"] = None, ) -> List[DownloadResult]: """ Download multiple files in parallel with progress tracking. Parameters ---------- download_info : List[Tuple[ObservationInfo, str]] List of (observation, download_url) tuples output_dir : Path Directory to save downloaded files max_workers : int Maximum number of parallel downloads (default: 4) show_progress : bool Whether to show progress bar (default: True) skip_existing : bool If True, skip files that already exist. If False, re-download all files (default: True) download_config : DownloadConfig, optional Download configuration with timeout, retries, etc. If None, uses defaults. Returns ------- List[DownloadResult] Results for all download attempts Notes ----- Priority: explicit max_workers parameter > download_config.max_download_workers > default """ from .config import DownloadConfig # Use default config if none provided if download_config is None: download_config = DownloadConfig() # Extract download parameters from config timeout = download_config.timeout retries = download_config.max_retries retry_delay = download_config.retry_delay chunk_size = download_config.chunk_size user_agent = download_config.user_agent logger.info(f"Starting parallel download of {len(download_info)} files") logger.info(f"Download settings: timeout={timeout}s, retries={retries}, workers={max_workers}") # Create output directories output_dir.mkdir(parents=True, exist_ok=True) # Organize downloads by band for band in ["D1", "D2", "D3", "D4", "D5", "D6"]: (output_dir / band).mkdir(exist_ok=True) results = [] # Set up progress bar if show_progress: pbar = tqdm(total=len(download_info), desc="Downloading", unit="files") # Download files in parallel with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all downloads future_to_info = {} for obs, url in download_info: # Generate output filename filename = f"{obs.obs_id}.fits" output_path = output_dir / obs.band / filename # Skip if already downloaded (and skip_existing is True) if skip_existing and output_path.exists(): logger.info(f"Skipping {filename} - already exists") results.append( DownloadResult( url=url, local_path=output_path, success=True, size_mb=output_path.stat().st_size / 1024 / 1024 ) ) if show_progress: pbar.update(1) continue # Submit download task with config parameters future = executor.submit( download_file, url, output_path, timeout, retries, retry_delay, chunk_size, user_agent ) future_to_info[future] = (obs, url) # Process completed downloads for future in as_completed(future_to_info): obs, url = future_to_info[future] try: result = future.result() results.append(result) if result.success: logger.info(f"Downloaded {obs.obs_id} ({result.size_mb:.1f} MB)") else: logger.error(f"Failed to download {obs.obs_id}: {result.error}") except Exception as e: logger.error(f"Unexpected error downloading {obs.obs_id}: {e}") results.append( DownloadResult( url=url, local_path=output_dir / obs.band / f"{obs.obs_id}.fits", success=False, error=str(e) ) ) if show_progress: pbar.update(1) if show_progress: pbar.close() # Summary successful = sum(1 for r in results if r.success) failed = len(results) - successful total_size = sum(r.size_mb for r in results if r.success and r.size_mb) logger.info(f"Download complete: {successful} successful, {failed} failed") logger.info(f"Total size: {total_size:.1f} MB") return results