import numpy as np
import healpy as hp
from .baseMetric import BaseMetric
# A collection of metrics which are primarily intended to be used as summary statistics.
__all__ = ['fOArea', 'fONv', 'TableFractionMetric', 'IdentityMetric',
'NormalizeMetric', 'ZeropointMetric', 'TotalPowerMetric']
[docs]class fONv(BaseMetric):
"""
Metrics based on a specified area, but returning NVISITS related to area:
given Asky, what is the minimum and median number of visits obtained over that much area?
(choose the portion of the sky with the highest number of visits first).
Parameters
----------
col : str or list of strs, opt
Name of the column in the numpy recarray passed to the summary metric.
Asky : float, opt
Area of the sky to base the evaluation of number of visits over.
Default 18,0000 sq deg.
nside : int, opt
Nside parameter from healpix slicer, used to set the physical relationship between on-sky area
and number of healpixels. Default 128.
Nvisit : int, opt
Number of visits to use as the benchmark value, if choosing to return a normalized Nvisit value.
norm : boolean, opt
Normalize the returned "nvisit" (min / median) values by Nvisit, if true.
Default False.
metricName : str, opt
Name of the summary metric. Default fONv.
"""
def __init__(self, col='metricdata', Asky=18000., nside=128, Nvisit=825,
norm=False, metricName='fONv', **kwargs):
"""Asky = square degrees """
super().__init__(col=col, metricName=metricName, **kwargs)
self.Nvisit = Nvisit
self.nside = nside
# Determine how many healpixels are included in Asky sq deg.
self.Asky = Asky
self.scale = hp.nside2pixarea(self.nside, degrees=True)
self.npix_Asky = np.int(np.ceil(self.Asky / self.scale))
self.norm = norm
[docs] def run(self, dataSlice, slicePoint=None):
if len(dataSlice) < self.npix_Asky:
return self.badval
name = dataSlice.dtype.names[0]
nvis_sorted = np.sort(dataSlice[name])
# Find the Asky's worth of healpixels with the largest # of visits.
nvis_Asky = nvis_sorted[-self.npix_Asky:]
result = np.empty(2, dtype=[('name', np.str_, 20), ('value', float)])
result['name'][0] = "MedianNvis"
result['value'][0] = np.median(nvis_Asky)
result['name'][1] = "MinNvis"
result['value'][1] = np.min(nvis_Asky)
if self.norm:
result['value'] /= float(self.Nvisit)
return result
[docs]class fOArea(BaseMetric):
"""
Metrics based on a specified number of visits, but returning AREA related to Nvisits:
given Nvisit, what amount of sky is covered with at least that many visits?
Parameters
----------
col : str or list of strs, opt
Name of the column in the numpy recarray passed to the summary metric.
Nvisit : int, opt
Number of visits to use as the minimum required -- metric calculated area that has this many visits.
Default 825.
Asky : float, opt
Area to use as the benchmark value, if choosing to returned a normalized Area value.
Default 18,0000 sq deg.
nside : int, opt
Nside parameter from healpix slicer, used to set the physical relationship between on-sky area
and number of healpixels. Default 128.
norm : boolean, opt
Normalize the returned "area" (area with minimum Nvisit visits) value by Asky, if true.
Default False.
metricName : str, opt
Name of the summary metric. Default fOArea.
"""
def __init__(self, col='metricdata', Nvisit=825, Asky = 18000.0, nside=128,
norm=False, metricName='fOArea', **kwargs):
"""Asky = square degrees """
super().__init__(col=col, metricName=metricName, **kwargs)
self.Nvisit = Nvisit
self.nside = nside
self.Asky = Asky
self.scale = hp.nside2pixarea(self.nside, degrees=True)
self.norm = norm
[docs] def run(self, dataSlice, slicePoint=None):
name = dataSlice.dtype.names[0]
nvis_sorted = np.sort(dataSlice[name])
# Identify the healpixels with more than Nvisits.
nvis_min = nvis_sorted[np.where(nvis_sorted >= self.Nvisit)]
if len(nvis_min) == 0:
result = self.badval
else:
result = nvis_min.size * self.scale
if self.norm:
result /= float(self.Asky)
return result
[docs]class TableFractionMetric(BaseMetric):
"""
Count the completeness (for many fields) and summarize how many fields have given completeness levels
(within a series of bins). Works with completenessMetric only.
This metric is meant to be used as a summary statistic on something like the completeness metric.
The output is DIFFERENT FROM SSTAR and is:
element matching values
0 0 == P
1 0 < P < .1
2 .1 <= P < .2
3 .2 <= P < .3
...
10 .9 <= P < 1
11 1 == P
12 1 < P
Note the 1st and last elements do NOT obey the numpy histogram conventions.
"""
def __init__(self, col='metricdata', nbins=10):
"""
colname = the column name in the metric data (i.e. 'metricdata' usually).
nbins = number of bins between 0 and 1. Should divide evenly into 100.
"""
super(TableFractionMetric, self).__init__(col=col, metricDtype='float')
self.nbins = nbins
# set this so runSliceMetric knows masked values should be set to zero and passed
self.maskVal = 0.
[docs] def run(self, dataSlice, slicePoint=None):
# Calculate histogram of completeness values that fall between 0-1.
goodVals = np.where((dataSlice[self.colname] > 0) & (dataSlice[self.colname] < 1) )
bins = np.arange(self.nbins+1.)/self.nbins
hist, b = np.histogram(dataSlice[self.colname][goodVals], bins=bins)
# Fill in values for exact 0, exact 1 and >1.
zero = np.size(np.where(dataSlice[self.colname] == 0)[0])
one = np.size(np.where(dataSlice[self.colname] == 1)[0])
overone = np.size(np.where(dataSlice[self.colname] > 1)[0])
hist = np.concatenate((np.array([zero]), hist, np.array([one]), np.array([overone])))
# Create labels for each value
binNames = ['0 == P']
binNames.append('0 < P < 0.1')
for i in np.arange(1, self.nbins):
binNames.append('%.2g <= P < %.2g'%(b[i], b[i+1]) )
binNames.append('1 == P')
binNames.append('1 < P')
# Package the names and values up
result = np.empty(hist.size, dtype=[('name', np.str_, 20), ('value', float)])
result['name'] = binNames
result['value'] = hist
return result
[docs]class IdentityMetric(BaseMetric):
"""
Return the metric value itself .. this is primarily useful as a summary statistic for UniSlicer metrics.
"""
[docs] def run(self, dataSlice, slicePoint=None):
if len(dataSlice[self.colname]) == 1:
result = dataSlice[self.colname][0]
else:
result = dataSlice[self.colname]
return result
[docs]class NormalizeMetric(BaseMetric):
"""
Return a metric values divided by 'normVal'. Useful for turning summary statistics into fractions.
"""
def __init__(self, col='metricdata', normVal=1, **kwargs):
super(NormalizeMetric, self).__init__(col=col, **kwargs)
self.normVal = float(normVal)
[docs] def run(self, dataSlice, slicePoint=None):
result = dataSlice[self.colname]/self.normVal
if len(result) == 1:
return result[0]
else:
return result
[docs]class ZeropointMetric(BaseMetric):
"""
Return a metric values with the addition of 'zp'. Useful for altering the zeropoint for summary statistics.
"""
def __init__(self, col='metricdata', zp=0, **kwargs):
super(ZeropointMetric, self).__init__(col=col, **kwargs)
self.zp = zp
[docs] def run(self, dataSlice, slicePoint=None):
result = dataSlice[self.colname] + self.zp
if len(result) == 1:
return result[0]
else:
return result
[docs]class TotalPowerMetric(BaseMetric):
"""
Calculate the total power in the angular power spectrum between lmin/lmax.
"""
def __init__(self, col='metricdata', lmin=100., lmax=300., removeDipole=True, **kwargs):
self.lmin = lmin
self.lmax = lmax
self.removeDipole = removeDipole
super(TotalPowerMetric, self).__init__(col=col, **kwargs)
self.maskVal = hp.UNSEEN
[docs] def run(self, dataSlice, slicePoint=None):
# Calculate the power spectrum.
if self.removeDipole:
cl = hp.anafast(hp.remove_dipole(dataSlice[self.colname], verbose=False))
else:
cl = hp.anafast(dataSlice[self.colname])
ell = np.arange(np.size(cl))
condition = np.where((ell <= self.lmax) & (ell >= self.lmin))[0]
totalpower = np.sum(cl[condition]*(2*ell[condition]+1))
return totalpower