Source code for romancal.pipeline.mosaic_pipeline

#!/usr/bin/env python
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import romancal.datamodels.filetype as filetype
from romancal.datamodels import ModelLibrary

# step imports
from romancal.flux import FluxStep
from romancal.outlier_detection import OutlierDetectionStep
from romancal.resample import ResampleStep
from romancal.skymatch import SkyMatchStep
from romancal.source_catalog import SourceCatalogStep

from ..stpipe import RomanPipeline

if TYPE_CHECKING:
    from typing import ClassVar

__all__ = ["MosaicPipeline"]

# Define logging
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)


[docs] class MosaicPipeline(RomanPipeline): """ MosaicPipeline: Apply all calibration steps to the roman data to produce level 3 products. Included steps are: ``flux``, ``skymatch``, ``outlier_detection``, ``resample`` and ``source catalog``. """ class_alias = "roman_mos" spec = """ save_results = boolean(default=False) on_disk = boolean(default=False) resample_on_skycell = boolean(default=True) """ # Define aliases to steps step_defs: ClassVar = { "flux": FluxStep, "skymatch": SkyMatchStep, "outlier_detection": OutlierDetectionStep, "resample": ResampleStep, "source_catalog": SourceCatalogStep, } # start the actual processing
[docs] def process(self, input_data): """Process the Roman WFI data from Level 2 to Level 3""" log.info("Starting Roman mosaic level calibration pipeline ...") # open the input file file_type = filetype.check(input_data) if file_type == "asn": library = ModelLibrary(input_data, on_disk=self.on_disk) elif file_type == "ModelLibrary": library = input_data else: raise TypeError( "The level three pipeline input needs to be an association or ModelLibrary" ) # propagate resample_on_skycell setting self.outlier_detection.resample_on_skycell = self.resample_on_skycell self.resample.resample_on_skycell = self.resample_on_skycell self.flux.suffix = "flux" result = self.flux.run(library) self.skymatch.suffix = "skymatch" result = self.skymatch.run(result) self.outlier_detection.suffix = "outlier_detection" result = self.outlier_detection.run(result) self.resample.suffix = "coadd" self.output_file = library.asn["products"][0]["name"] result = self.resample.run(result) self.source_catalog.output_file = self.output_file self.source_catalog.run(result) self.suffix = "coadd" return result