import warnings
import numpy as np
import palpy
from lsst.sims.utils import Site, m5_flat_sed
from .baseStacker import BaseStacker
from builtins import str
__all__ = ['NormAirmassStacker', 'ParallaxFactorStacker', 'HourAngleStacker',
'FilterColorStacker', 'ZenithDistStacker', 'ParallacticAngleStacker',
'SeasonStacker', 'DcrStacker', 'FiveSigmaStacker']
# Original stackers by Peter Yoachim (yoachim@uw.edu)
# Filter color stacker by Lynne Jones (lynnej@uw.edu)
# Season stacker by Phil Marshall (dr.phil.marshall@gmail.com),
# modified by Humna Awan (humna.awan@rutgers.edu)
[docs]class FiveSigmaStacker(BaseStacker):
"""
Calculate the 5-sigma limiting depth for a point source in the given conditions
"""
def __init__(self, airmassCol='airmass', seeingCol='seeingFwhmEff', skybrightnessCol='skyBrightness',
filterCol='filter', exptimeCol='visitExposureTime'):
self.units = ['mag']
self.colsAdded = ['fiveSigmaDepth']
self.colsReq = [airmassCol, seeingCol, skybrightnessCol, filterCol, exptimeCol]
self.airmassCol = airmassCol
self.seeingCol = seeingCol
self.skybrightnessCol = skybrightnessCol
self.filterCol = filterCol
self.exptimeCol = exptimeCol
def _run(self, simData):
filts = np.unique(simData[self.filterCol])
for filtername in filts:
infilt = np.where(simData[self.filterCol] == filtername)
simData['fiveSigmaDepth'][infilt] = m5_flat_sed(filtername,
simData[infilt][self.skybrightnessCol],
simData[infilt][self.seeingCol],
simData[infilt][self.exptimeCol],
simData[infilt][self.airmassCol])
return simData
[docs]class NormAirmassStacker(BaseStacker):
"""Calculate the normalized airmass for each opsim pointing.
"""
def __init__(self, airmassCol='airmass', decCol='fieldDec',
degrees=True, telescope_lat = -30.2446388):
self.units = ['airmass/(minimum possible airmass)']
self.colsAdded = ['normairmass']
self.colsReq = [airmassCol, decCol]
self.airmassCol = airmassCol
self.decCol = decCol
self.telescope_lat = telescope_lat
self.degrees = degrees
def _run(self, simData):
"""Calculate new column for normalized airmass."""
# Run method is required to calculate column.
# Driver runs getColInfo to know what columns are needed from db & which are calculated,
# then gets data from db and then calculates additional columns (via run methods here).
dec = simData[self.decCol]
if self.degrees:
dec = np.radians(dec)
min_z_possible = np.abs(dec - np.radians(self.telescope_lat))
min_airmass_possible = 1./np.cos(min_z_possible)
simData['normairmass'] = simData[self.airmassCol] / min_airmass_possible
return simData
[docs]class ZenithDistStacker(BaseStacker):
"""Calculate the zenith distance for each pointing.
"""
def __init__(self, altCol = 'altitude', degrees=True):
self.altCol = altCol
self.units = ['degrees']
self.colsAdded = ['zenithDistance']
self.colsReq = [self.altCol]
self.degrees = degrees
def _run(self, simData):
"""Calculate new column for zenith distance."""
if self.degrees:
zenithDist = np.pi-np.radians(simData[self.altCol])
else:
zenithDist = np.pi-simData[self.altCol]
simData['zenithDistance'] = np.degrees(zenithDist)
return simData
[docs]class ParallaxFactorStacker(BaseStacker):
"""Calculate the parallax factors for each opsim pointing. Output parallax factor in arcseconds.
"""
def __init__(self, raCol='fieldRA', decCol='fieldDec', dateCol='observationStartMJD', raDecDeg=True):
self.raCol = raCol
self.decCol = decCol
self.dateCol = dateCol
self.units = ['arcsec', 'arcsec']
self.colsAdded = ['ra_pi_amp', 'dec_pi_amp']
self.colsReq = [raCol, decCol, dateCol]
self.raDecDeg = raDecDeg
def _gnomonic_project_toxy(self, RA1, Dec1, RAcen, Deccen):
"""Calculate x/y projection of RA1/Dec1 in system with center at RAcen, Deccenp.
Input radians.
"""
# also used in Global Telescope Network website
cosc = np.sin(Deccen) * np.sin(Dec1) + np.cos(Deccen) * np.cos(Dec1) * np.cos(RA1-RAcen)
x = np.cos(Dec1) * np.sin(RA1-RAcen) / cosc
y = (np.cos(Deccen)*np.sin(Dec1) - np.sin(Deccen)*np.cos(Dec1)*np.cos(RA1-RAcen)) / cosc
return x, y
def _run(self, simData):
ra_pi_amp = np.zeros(np.size(simData), dtype=[('ra_pi_amp', 'float')])
dec_pi_amp = np.zeros(np.size(simData), dtype=[('dec_pi_amp', 'float')])
ra_geo1 = np.zeros(np.size(simData), dtype='float')
dec_geo1 = np.zeros(np.size(simData), dtype='float')
ra_geo = np.zeros(np.size(simData), dtype='float')
dec_geo = np.zeros(np.size(simData), dtype='float')
ra = simData[self.raCol]
dec = simData[self.decCol]
if self.raDecDeg:
ra = np.radians(ra)
dec = np.radians(dec)
for i, ack in enumerate(simData):
mtoa_params = palpy.mappa(2000., simData[self.dateCol][i])
# Object with a 1 arcsec parallax
ra_geo1[i], dec_geo1[i] = palpy.mapqk(ra[i], dec[i],
0., 0., 1., 0., mtoa_params)
# Object with no parallax
ra_geo[i], dec_geo[i] = palpy.mapqk(ra[i], dec[i],
0., 0., 0., 0., mtoa_params)
x_geo1, y_geo1 = self._gnomonic_project_toxy(ra_geo1, dec_geo1,
ra, dec)
x_geo, y_geo = self._gnomonic_project_toxy(ra_geo, dec_geo, ra, dec)
ra_pi_amp[:] = np.degrees(x_geo1-x_geo)*3600.
dec_pi_amp[:] = np.degrees(y_geo1-y_geo)*3600.
simData['ra_pi_amp'] = ra_pi_amp
simData['dec_pi_amp'] = dec_pi_amp
return simData
[docs]class DcrStacker(BaseStacker):
"""Calculate the RA,Dec offset expected for an object due to differential chromatic refraction.
Parameters
----------
filterCol : str
The name of the column with filter names. Default 'fitler'.
altCol : str
Name of the column with altitude info. Default 'altitude'.
raCol : str
Name of the column with RA. Default 'ra_rad'.
decCol : str
Name of the column with Dec. Default 'dec_rad'.
lstCol : str
Name of the column with local sidereal time. Default 'lst'.
site : str or lsst.sims.utils.Site
Name of the observory or a lsst.sims.utils.Site object. Default 'LSST'.
mjdCol : str
Name of column with modified julian date. Default 'observationStartMJD'
dcr_magnitudes : dict
Magitude of the DCR offset for each filter at altitude/zenith distance of 45 degrees.
Defaults u=0.07, g=0.07, r=0.50, i=0.045, z=0.042, y=0.04 (all arcseconds).
Returns
-------
numpy.array
Returns array with additional columns 'ra_dcr_amp' and 'dec_dcr_amp' with the DCR offsets
for each observation. Also runs ZenithDistStacker and ParallacticAngleStacker.
"""
def __init__(self, filterCol='filter', altCol='altitude', raDecDeg=True,
raCol='fieldRA', decCol='fieldDec', lstCol='observationStartLST',
site='LSST', mjdCol='observationStartMJD',
dcr_magnitudes={'u': 0.07, 'g': 0.07, 'r': 0.050, 'i': 0.045, 'z': 0.042, 'y': 0.04}):
self.zdCol = 'zenithDistance'
self.paCol = 'PA'
self.filterCol = filterCol
self.raCol = raCol
self.decCol = decCol
self.dcr_magnitudes = dcr_magnitudes
self.colsAdded = ['ra_dcr_amp', 'dec_dcr_amp', 'zenithDistance', 'PA', 'HA']
self.colsReq = [filterCol, raCol, decCol, altCol, lstCol]
self.units = ['arcsec', 'arcsec']
self.raDecDeg = raDecDeg
self.zstacker = ZenithDistStacker(altCol = altCol)
self.pastacker = ParallacticAngleStacker(raCol=raCol, decCol=decCol, mjdCol=mjdCol,
lstCol=lstCol, site=site)
def _run(self, simData):
# Need to make sure the Zenith stacker gets run first
simData = self.zstacker._run(simData)
simData = self.pastacker._run(simData)
dcr_in_ra = np.tan(simData[self.zdCol])*np.sin(simData[self.paCol])
dcr_in_dec = np.tan(simData[self.zdCol])*np.cos(simData[self.paCol])
for filtername in np.unique(simData[self.filterCol]):
fmatch = np.where(simData[self.filterCol] == filtername)
dcr_in_ra[fmatch] = self.dcr_magnitudes[filtername] * dcr_in_ra[fmatch]
dcr_in_dec[fmatch] = self.dcr_magnitudes[filtername] * dcr_in_dec[fmatch]
simData['ra_dcr_amp'] = dcr_in_ra
simData['dec_dcr_amp'] = dcr_in_dec
return simData
[docs]class HourAngleStacker(BaseStacker):
"""Add the Hour Angle for each observation.
"""
def __init__(self, lstCol='observationStartLST', raCol='fieldRA'):
self.units = ['Hours']
self.colsAdded = ['HA']
self.colsReq = [lstCol, raCol]
self.lstCol = lstCol
self.raCol = raCol
def _run(self, simData):
"""HA = LST - RA """
if len(simData) == 0:
return simData
ra = np.radians(simData[self.raCol])
lst = np.radians(simData[self.lstCol])
# Check that LST is reasonable
if (np.min(lst) < 0) | (np.max(lst) > 2.*np.pi):
warnings.warn('LST values are not between 0 and 2 pi')
# Check that RA is reasonable
if (np.min(ra) < 0) | (np.max(ra) > 2.*np.pi):
warnings.warn('RA values are not between 0 and 2 pi')
ha = lst - ra
# Wrap the results so HA between -pi and pi
ha = np.where(ha < -np.pi, ha + 2. * np.pi, ha)
ha = np.where(ha > np.pi, ha - 2. * np.pi, ha)
# Convert radians to hours
simData['HA'] = ha*12/np.pi
return simData
[docs]class ParallacticAngleStacker(BaseStacker):
"""Add the parallactic angle (in radians) to each visit.
"""
def __init__(self, raCol='fieldRA', decCol='fieldDec', mjdCol='observationStartMJD',
lstCol='observationStartLST', site='LSST'):
self.lstCol = lstCol
self.raCol = raCol
self.decCol = decCol
self.mjdCol = mjdCol
self.site = Site(name=site)
self.units = ['radians']
self.colsAdded = ['PA', 'HA']
self.colsReq = [self.raCol, self.decCol, self.mjdCol, self.lstCol]
self.haStacker = HourAngleStacker(lstCol=lstCol, raCol=raCol)
def _run(self, simData):
# Equation from:
# http://www.gb.nrao.edu/~rcreager/GBTMetrology/140ft/l0058/gbtmemo52/memo52.html
# or
# http://www.gb.nrao.edu/GBT/DA/gbtidl/release2pt9/contrib/contrib/parangle.pro
simData = self.haStacker._run(simData)
dec = np.radians(simData[self.decCol])
simData['PA'] = np.arctan2(np.sin(simData['HA']*np.pi/12.), (np.cos(dec) *
np.tan(self.site.latitude_rad) - np.sin(dec) *
np.cos(simData['HA']*np.pi/12.)))
return simData
[docs]class FilterColorStacker(BaseStacker):
"""Translate filters ('u', 'g', 'r' ..) into RGB tuples.
"""
def __init__(self, filterCol='filter'):
self.filter_rgb_map = {'u': (0, 0, 1), # dark blue
'g': (0, 1, 1), # cyan
'r': (0, 1, 0), # green
'i': (1, 0.5, 0.3), # orange
'z': (1, 0, 0), # red
'y': (1, 0, 1)} # magenta
self.filterMap_filters = [str(f) for f in self.filter_rgb_map]
self.filterCol = filterCol
# self.units used for plot labels
self.units = ['rChan', 'gChan', 'bChan']
# Values required for framework operation: this specifies the names of the new columns.
self.colsAdded = ['rRGB', 'gRGB', 'bRGB']
# Values required for framework operation: this specifies the data columns required from the database.
self.colsReq = [self.filterCol]
def _run(self, simData):
# Translate filter names into numbers.
filtersUsed = np.unique(simData[self.filterCol])
filtersUsed = [str(f) for f in filtersUsed]
if issubclass(type(filtersUsed[0]), bytes):
filtersUsed = [str(f.decode('utf-8')) for f in filtersUsed]
for f in filtersUsed:
if f not in self.filterMap_filters:
raise IndexError('Filter %s not in filter_rgb_map' % (f))
match = np.where(str(simData[self.filterCol]) == str(f))[0]
simData['rRGB'][match] = self.filter_rgb_map[f][0]
simData['gRGB'][match] = self.filter_rgb_map[f][1]
simData['bRGB'][match] = self.filter_rgb_map[f][2]
return simData
[docs]class SeasonStacker(BaseStacker):
"""Add an integer label to show which season a given visit is in.
The season only depends on the RA of the object: we compute the MJD
when each object is on the meridian at midnight, and subtract 6
months to get the start date of each season.
The season index range is 0-10.
Must wrap 0th and 10th to get a total of 10 seasons.
"""
def __init__(self, observationStartMJDCol='observationStartMJD', RACol='fieldRA'):
# Names of columns we want to add.
self.colsAdded = ['year', 'season']
# Names of columns we need from database.
self.colsReq = [observationStartMJDCol, RACol]
# List of units for our new columns.
self.units = ['', '']
# And save the column names.
self.observationStartMJDCol = observationStartMJDCol
self.RACol = RACol
def _run(self, simData):
# Define year number: (note that opsim defines "years" in flat 365 days).
year = np.floor((simData[self.observationStartMJDCol] - simData[self.observationStartMJDCol][0]) / 365)
objRA = simData[self.RACol]/15.0 # in hrs
# objRA=0 on autumnal equinox.
# autumnal equinox 2014 happened on Sept 23 --> Equinox MJD
Equinox = 2456923.5 - 2400000.5
# Use 365.25 for the length of a year here, because we're dealing with real seasons.
daysSinceEquinox = 0.5*objRA*(365.25/12.0) # 0.5 to go from RA to month; 365.25/12.0 months to days
firstSeasonBegan = Equinox + daysSinceEquinox - 0.5*365.25 # in MJD
# Now we can compute the number of years since the first season
# began, and so assign a global integer season number:
globalSeason = np.floor((simData[self.observationStartMJDCol] - firstSeasonBegan)/365.25)
# Subtract off season number of first observation:
season = globalSeason - np.min(globalSeason)
simData['year'] = year
simData['season'] = season
return simData