Source code for vespa.stars.populations

from __future__ import print_function, division

import logging
import re, os, os.path

on_rtd = os.environ.get('READTHEDOCS') == 'True'

if not on_rtd:
    import numpy as np
    import matplotlib.pyplot as plt
    import pandas as pd
    import scipy.stats as stats
    import numpy.random as rand

    from astropy import units as u
    from astropy.units import Quantity
    from astropy.coordinates import SkyCoord

else:
    #hacking...
    class np(object):
        inf = 1
    plt = None
    pd = None
    stats = None
    rand = None
    u = None
    Quantity = None
    SkyCoord = None

from ..orbits import OrbitPopulation
from ..orbits import TripleOrbitPopulation
from ..plotutils import setfig,plot2dhist

try:
    from isochrones.starmodel import StarModel, BinaryStarModel
    from isochrones.starmodel import TripleStarModel
    from isochrones import get_ichrone
except ImportError:
    StarModel, BinaryStarModel, TripleStarModel = (None, None, None)

try:
    from simpledist import distributions as dists
except ImportError:
    dists = None

from ..hashutils import hashcombine, hashdict, hashdf

from .constraints import Constraint,UpperLimit,LowerLimit,JointConstraintOr
from .constraints import ConstraintDict,MeasurementConstraint,RangeConstraint
from .contrastcurve import ContrastCurveConstraint,VelocityContrastCurveConstraint
from .contrastcurve import ContrastCurveFromFile

from .utils import randpos_in_circle, draw_raghavan_periods
from .utils import draw_msc_periods, draw_eccs
from .utils import flat_massratio, mult_masses
from .utils import distancemodulus, addmags, dfromdm

from .trilegal import get_trilegal

try:
    from isochrones import get_ichrone
except ImportError:
    logging.warning('isochrones package not installed; population simulations will not be fully functional')
    DARTMOUTH = None

BANDS = ['g','r','i','z','J','H','K','Kepler']

[docs]class StarPopulation(object): """A population of stars. This object contains information of a simulated population of stars. It has a flexible purpose-- it could represent many random realizations of a single system, or it could also represent many different random systems. This is the general base class; subclasses include, e.g., :class:`MultipleStarPopulation` and :class:`BGStarPopulation_TRILEGAL`. The :attr:`StarPopulation.stars` attribute is a :class:`pandas.DataFrame` containing all the information about all the random realizations, such as the physical star properties (mass, radius, etc.) and observational characteristics (magnitudes in different bands). The :attr:`StarPopulation.orbpop` attribute stores information about the orbits of the random stars, if such a thing is relevant for the population in question (such as, e.g., a :class:`MultipleStarPopulation`). If orbits are relevant, then attributes such as :attr:`StarPopulation.Rsky`, :attr:`StarPopulation.RV`, and :func:`StarPopulation.dmag` are defined as well. Importantly, you can apply constraints to a :class:`StarPopulation`, implemented via the :class:`Constraint` class. You can constrain properties of the stars to be within a given range, you can apply a :class:`ContrastCurveConstraint`, simulating the exclusion curve of an imaging observation, and many others. You can save and re-load :class:`StarPopulation` objects using :func:`StarPopulation.save_hdf` and :func:`StarPopulation.load_hdf`. .. warning:: Support for saving constraints is planned and partially implemented but untested. Any subclass must be able to be initialized with no arguments, with no calculations being done; this enables the way that :func:`StarPopulation.load_hdf` is implemented to work properly. :param stars: (:class:`pandas.DataFrame`, optional) Table containing properties of stars. Magnitude properties end with "_mag". Default is that these magnitudes are absolute, and get converted to apparent magnitudes based on distance, which is either provided or randomly assigned. :param distance: If ``None``, then distances of stars are assigned randomly out to max_distance, or by comparing to mags. If float, then assumed to be in parsec. Or, if stars already has a distance column, this is ignored. :type distance: :class:`astropy.units.Quantity`, float, or array-like, optional :param max_distance: ``Quantity`` or float, optional Max distance out to which distances will be simulated, according to random placements in volume ($p(d)\simd^2$). Ignored if stars already has a distance column. :type max_distance: :class:`astropy.units.Quantity` or float, optional :param convert_absmags: (``bool``, optional) If ``True``, then magnitudes in ``stars`` will be converted to apparent magnitudes based on distance. If ``False,`` then magnitudes will be kept as-is. Ignored if stars already has a distance column. :param orbpop: Describes the orbits of the stars. :type orbpop: :class:`orbits.OrbitPopulation` """ def __init__(self,stars=None,distance=None, max_distance=1000,convert_absmags=True, name='', orbpop=None, mags=None): self.orbpop = orbpop self.name = name if stars is None: self.stars = None else: self.stars = stars.copy() N = len(self.stars) #if stars does not have a 'distance' column already, then # we define distances based on the provided arguments, # and covert absolute magnitudes into apparent (unless explicitly # forbidden from doing so by 'convert_absmags' being set # to False). if 'distance' not in self.stars: if type(max_distance) != Quantity: max_distance = max_distance * u.pc if distance is None: #generate random distances dmax = max_distance.to('pc').value #distance_distribution = dists.PowerLaw_Distribution(2.,1,dmax) # p(d)~d^2 #distance = distance_distribution.rvs(N) # p(d) ~ d^2 distance = stats.powerlaw(3).rvs(N) * dmax if type(distance) != Quantity: distance = distance * u.pc distmods = distancemodulus(distance) if convert_absmags: for col in self.stars.columns: if re.search('_mag',col): self.stars[col] += distmods self.stars['distance'] = distance self.stars['distmod'] = distmods if 'distmod' not in self.stars: self.stars['distmod'] = distancemodulus(self.stars['distance']) @property def starmodel(self): if hasattr(self, '_starmodel'): return self._starmodel else: return AttributeError('No starmodel for this object.') @property def Rsky(self): """ Projected angular distance between "primary" and "secondary" (exact meaning varies) """ r = (self.orbpop.Rsky/self.distance) return r.to('arcsec',equivalencies=u.dimensionless_angles()) @property def RV(self): """ Radial velocity difference between "primary" and "secondary" (exact meaning varies) """ return self.orbpop.RV
[docs] def dRV(self,dt): """ Change in RV between two epochs separated by dt :param dt: Time difference between two epochs, either :class:`astropy.units.Quantity` or days. :return: Change in RV. """ return self.orbpop.dRV(dt)
[docs] def dmag(self, band): """ Magnitude difference between "primary" and "secondary" in given band Exact definition will depend on context. Only legit if ``self.mags`` is defined (i.e., not ``None``). :param band: (``string``) Desired photometric bandpass. """ if self.mags is None: raise ValueError('This population does not have a "mags" attribute ' + 'defined; dmags is meaningless.') return self.stars['{}_mag'.format(band)] - self.mags[band]
[docs] def append(self, other): """Appends stars from another StarPopulations, in place. :param other: Another :class:`StarPopulation`; must have same columns as ``self``. """ if not isinstance(other,StarPopulation): raise TypeError('Only StarPopulation objects can be appended to a StarPopulation.') if not np.all(self.stars.columns == other.stars.columns): raise ValueError('Two populations must have same columns to combine them.') if len(self.constraints) > 0: logging.warning('All constraints are cleared when appending another population.') self.stars = pd.concat((self.stars, other.stars)) if self.orbpop is not None and other.orbpop is not None: self.orbpop = self.orbpop + other.orbpop
def __getitem__(self,prop): return self.selected[prop] def __hash__(self): return hashcombine(self.constraints, hashdf(self.stars), self.orbpop) def __eq__(self, other): return hash(self)==hash(other)
[docs] def generate(self, *args, **kwargs): """ Function that generates population. """ raise NotImplementedError
@property def is_ruled_out(self): """ Will be ``True`` if contraints rule out all (or all but one) instances """ if hasattr(self,'is_empty'): return self.is_empty else: return self.distok.sum() < 2 @property def bands(self): """ Bandpasses for which StarPopulation has magnitude data """ bands = [] for c in self.stars.columns: if re.search('_mag',c): bands.append(c) return bands @property def distance(self): """Distance to stars. """ return np.array(self.stars['distance'])*u.pc @distance.setter def distance(self,value): """New distance value must be a ``Quantity`` object """ self.stars['distance'] = value.to('pc').value old_distmod = self.stars['distmod'].copy() new_distmod = distancemodulus(self.stars['distance']) for m in self.bands: self.stars[m] += new_distmod - old_distmod self.stars['distmod'] = new_distmod logging.warning('Setting the distance manually may have screwed up your constraints. Re-apply constraints as necessary.') @property def distok(self): """ Boolean array showing which stars pass all distribution constraints. A "distribution constraint" is a constraint that affects the distribution of stars, rather than just the number. """ ok = np.ones(len(self.stars)).astype(bool) for name in self.constraints: c = self.constraints[name] if c.name not in self.distribution_skip: ok &= c.ok return ok @property def countok(self): """ Boolean array showing which stars pass all count constraints. A "count constraint" is a constraint that affects the number of stars. """ ok = np.ones(len(self.stars)).astype(bool) for name in self.constraints: c = self.constraints[name] if c.name not in self.selectfrac_skip: ok &= c.ok return ok @property def selected(self): """ All stars that pass all distribution constraints. """ return self.stars[self.distok] @property def selectfrac(self): """ Fraction of stars that pass count constraints. """ return self.countok.sum()/len(self.stars)
[docs] def prophist2d(self,propx,propy, mask=None, logx=False,logy=False, fig=None,selected=False,**kwargs): """Makes a 2d density histogram of two given properties :param propx,propy: Names of properties to histogram. Must be names of columns in ``self.stars`` table. :param mask: (optional) Boolean mask (``True`` is good) to say which indices to plot. Must be same length as ``self.stars``. :param logx,logy: (optional) Whether to plot the log10 of x and/or y properties. :param fig: (optional) Argument passed to :func:`plotutils.setfig`. :param selected: (optional) If ``True``, then only the "selected" stars (that is, stars obeying all distribution constraints attached to this object) will be plotted. In this case, ``mask`` will be ignored. :param kwargs: Additional keyword arguments passed to :func:`plotutils.plot2dhist`. """ if mask is not None: inds = np.where(mask)[0] else: if selected: inds = self.selected.index else: inds = self.stars.index if selected: xvals = self.selected[propx].iloc[inds].values yvals = self.selected[propy].iloc[inds].values else: if mask is None: mask = np.ones_like(self.stars.index) xvals = self.stars[mask][propx].values yvals = self.stars[mask][propy].values #forward-hack for EclipsePopulations... #TODO: reorganize. if propx=='depth' and hasattr(self,'depth'): xvals = self.depth.iloc[inds].values if propy=='depth' and hasattr(self,'depth'): yvals = self.depth.iloc[inds].values if logx: xvals = np.log10(xvals) if logy: yvals = np.log10(yvals) plot2dhist(xvals,yvals,fig=fig,**kwargs) plt.xlabel(propx) plt.ylabel(propy)
[docs] def prophist(self,prop,fig=None,log=False, mask=None, selected=False,**kwargs): """Plots a 1-d histogram of desired property. :param prop: Name of property to plot. Must be column of ``self.stars``. :param fig: (optional) Argument for :func:`plotutils.setfig` :param log: (optional) Whether to plot the histogram of log10 of the property. :param mask: (optional) Boolean array (length of ``self.stars``) to say which indices to plot (``True`` is good). :param selected: (optional) If ``True``, then only the "selected" stars (that is, stars obeying all distribution constraints attached to this object) will be plotted. In this case, ``mask`` will be ignored. :param **kwargs: Additional keyword arguments passed to :func:`plt.hist`. """ setfig(fig) inds = None if mask is not None: inds = np.where(mask)[0] elif inds is None: if selected: #inds = np.arange(len(self.selected)) inds = self.selected.index else: #inds = np.arange(len(self.stars)) inds = self.stars.index if selected: vals = self.selected[prop].values#.iloc[inds] #invalidates mask? else: vals = self.stars[prop].iloc[inds].values if prop=='depth' and hasattr(self,'depth'): vals *= self.dilution_factor[inds] if log: h = plt.hist(np.log10(vals),**kwargs) else: h = plt.hist(vals,**kwargs) plt.xlabel(prop)
[docs] def constraint_stats(self,primarylist=None): """Returns information about effect of constraints on population. :param primarylist: List of constraint names that you want specific information on (i.e., not blended within "multiple constraints".) :return: ``dict`` of what percentage of population is ruled out by each constraint, including a "multiple constraints" entry. """ if primarylist is None: primarylist = [] n = len(self.stars) primaryOK = np.ones(n).astype(bool) tot_reject = np.zeros(n) for name in self.constraints: if name in self.selectfrac_skip: continue c = self.constraints[name] if name in primarylist: primaryOK &= c.ok tot_reject += ~c.ok primary_rejected = ~primaryOK secondary_rejected = tot_reject - primary_rejected lone_reject = {} for name in self.constraints: if name in primarylist or name in self.selectfrac_skip: continue c = self.constraints[name] lone_reject[name] = ((secondary_rejected==1) & (~primary_rejected) & (~c.ok)).sum()/float(n) mult_rejected = (secondary_rejected > 1) & (~primary_rejected) not_rejected = ~(tot_reject.astype(bool)) primary_reject_pct = primary_rejected.sum()/float(n) mult_reject_pct = mult_rejected.sum()/float(n) not_reject_pct = not_rejected.sum()/float(n) tot = 0 results = {} results['pri'] = primary_reject_pct tot += primary_reject_pct for name in lone_reject: results[name] = lone_reject[name] tot += lone_reject[name] results['multiple constraints'] = mult_reject_pct tot += mult_reject_pct results['remaining'] = not_reject_pct tot += not_reject_pct if tot != 1: logging.warning('total adds up to: %.2f (%s)' % (tot,self.model)) return results
[docs] def constraint_piechart(self,primarylist=None, fig=None,title='',colordict=None, legend=True,nolabels=False): """Makes piechart illustrating constraints on population :param primarylist: (optional) List of most import constraints to show (see :func:`StarPopulation.constraint_stats`) :param fig: (optional) Passed to :func:`plotutils.setfig`. :param title: (optional) Title for pie chart :param colordict: (optional) Dictionary describing colors (keys are constraint names). :param legend: (optional) ``bool`` indicating whether to display a legend. :param nolabels: (optional) If ``True``, then leave out legend labels. """ setfig(fig,figsize=(6,6)) stats = self.constraint_stats(primarylist=primarylist) if primarylist is None: primarylist = [] if len(primarylist)==1: primaryname = primarylist[0] else: primaryname = '' for name in primarylist: primaryname += '%s,' % name primaryname = primaryname[:-1] fracs = [] labels = [] explode = [] colors = [] fracs.append(stats['remaining']*100) labels.append('remaining') explode.append(0.05) colors.append('b') if 'pri' in stats and stats['pri']>=0.005: fracs.append(stats['pri']*100) labels.append(primaryname) explode.append(0) if colordict is not None: colors.append(colordict[primaryname]) for name in stats: if name == 'pri' or \ name == 'multiple constraints' or \ name == 'remaining': continue fracs.append(stats[name]*100) labels.append(name) explode.append(0) if colordict is not None: colors.append(colordict[name]) if stats['multiple constraints'] >= 0.005: fracs.append(stats['multiple constraints']*100) labels.append('multiple constraints') explode.append(0) colors.append('w') autopct = '%1.1f%%' if nolabels: labels = None if legend: legendlabels = [] for i,l in enumerate(labels): legendlabels.append('%s (%.1f%%)' % (l,fracs[i])) labels = None autopct = '' if colordict is None: plt.pie(fracs,labels=labels,autopct=autopct,explode=explode) else: plt.pie(fracs,labels=labels,autopct=autopct,explode=explode, colors=colors) if legend: plt.legend(legendlabels,bbox_to_anchor=(-0.05,0), loc='lower left',prop={'size':10}) plt.title(title)
@property def selectfrac_skip(self): """ Names of constraints that should *not* be considered for counting purposes """ try: return self._selectfrac_skip except AttributeError: self._selectfrac_skip = [] return self._selectfrac_skip @selectfrac_skip.setter def selectfrac_skip(self, value): self._selectfrac_skip = value @property def distribution_skip(self): """ Names of constraints that should *not* be considered for distribution purposes """ try: return self._distribution_skip except AttributeError: self._distribution_skip = [] return self._distribution_skip @distribution_skip.setter def distribution_skip(self, value): self._distribution_skip = value @property def constraints(self): """ Constraints applied to the population. """ try: return self._constraints except AttributeError: self._constraints = ConstraintDict() return self._constraints @constraints.setter def constraints(self, value): self._constraints = value @property def hidden_constraints(self): """ Constraints applied to the population, but temporarily removed. """ try: return self._hidden_constraints except AttributeError: self._hidden_constraints = ConstraintDict() return self._hidden_constraints @hidden_constraints.setter def hidden_constraints(self, value): self._hidden_constraints = value
[docs] def apply_constraint(self,constraint,selectfrac_skip=False, distribution_skip=False,overwrite=False): """Apply a constraint to the population :param constraint: Constraint to apply. :type constraint: :class:`Constraint` :param selectfrac_skip: (optional) If ``True``, then this constraint will not be considered towards diminishing the """ #grab properties constraints = self.constraints my_selectfrac_skip = self.selectfrac_skip my_distribution_skip = self.distribution_skip if constraint.name in constraints and not overwrite: logging.warning('constraint already applied: {}'.format(constraint.name)) return constraints[constraint.name] = constraint if selectfrac_skip: my_selectfrac_skip.append(constraint.name) if distribution_skip: my_distribution_skip.append(constraint.name) #forward-looking for EclipsePopulation if hasattr(self, '_make_kde'): self._make_kde() self.constraints = constraints self.selectfrac_skip = my_selectfrac_skip self.distribution_skip = my_distribution_skip
#self._apply_all_constraints()
[docs] def replace_constraint(self,name,selectfrac_skip=False,distribution_skip=False): """ Re-apply constraint that had been removed :param name: Name of constraint to replace :param selectfrac_skip,distribution_skip: (optional) Same as :func:`StarPopulation.apply_constraint` """ hidden_constraints = self.hidden_constraints if name in hidden_constraints: c = hidden_constraints[name] self.apply_constraint(c,selectfrac_skip=selectfrac_skip, distribution_skip=distribution_skip) del hidden_constraints[name] else: logging.warning('Constraint {} not available for replacement.'.format(name)) self.hidden_constraints = hidden_constraints
[docs] def remove_constraint(self,name): """ Remove a constraint (make it "hidden") :param name: Name of constraint. """ constraints = self.constraints hidden_constraints = self.hidden_constraints my_distribution_skip = self.distribution_skip my_selectfrac_skip = self.selectfrac_skip if name in constraints: hidden_constraints[name] = constraints[name] del constraints[name] if name in self.distribution_skip: my_distribution_skip.remove(name) if name in self.selectfrac_skip: my_selectfrac_skip.remove(name) #self._apply_all_constraints() else: logging.warning('Constraint {} does not exist.'.format(name)) self.constraints = constraints self.hidden_constraints = hidden_constraints self.selectfrac_skip = my_selectfrac_skip self.distribution_skip = my_distribution_skip
[docs] def constrain_property(self,prop,lo=-np.inf,hi=np.inf, measurement=None,thresh=3, selectfrac_skip=False,distribution_skip=False): """Apply constraint that constrains property. :param prop: Name of property. Must be column in ``self.stars``. :type prop: ``str`` :param lo,hi: (optional) Low and high allowed values for ``prop``. Defaults to ``-np.inf`` and ``np.inf`` to allow for defining only lower or upper limits if desired. :param measurement: (optional) Value and error of measurement in form ``(value, error)``. :param thresh: (optional) Number of "sigma" to allow for measurement constraint. :param selectfrac_skip,distribution_skip: Passed to :func:`StarPopulation.apply_constraint`. """ if prop in self.constraints: logging.info('re-doing {} constraint'.format(prop)) self.remove_constraint(prop) if measurement is not None: val,dval = measurement self.apply_constraint(MeasurementConstraint(getattr(self.stars,prop), val,dval,name=prop, thresh=thresh), selectfrac_skip=selectfrac_skip, distribution_skip=distribution_skip) else: self.apply_constraint(RangeConstraint(getattr(self.stars,prop), lo=lo,hi=hi,name=prop), selectfrac_skip=selectfrac_skip, distribution_skip=distribution_skip)
[docs] def apply_trend_constraint(self, limit, dt, distribution_skip=False, **kwargs): """ Constrains change in RV to be less than limit over time dt. Only works if ``dRV`` and ``Plong`` attributes are defined for population. :param limit: Radial velocity limit on trend. Must be :class:`astropy.units.Quantity` object, or else interpreted as m/s. :param dt: Time baseline of RV observations. Must be :class:`astropy.units.Quantity` object; else interpreted as days. :param distribution_skip: This is by default ``True``. *To be honest, I'm not exactly sure why. Might be important, might not (don't remember).* :param **kwargs: Additional keyword arguments passed to :func:`StarPopulation.apply_constraint`. """ if type(limit) != Quantity: limit = limit * u.m/u.s if type(dt) != Quantity: dt = dt * u.day dRVs = np.absolute(self.dRV(dt)) c1 = UpperLimit(dRVs, limit) c2 = LowerLimit(self.Plong, dt*4) self.apply_constraint(JointConstraintOr(c1,c2,name='RV monitoring', Ps=self.Plong,dRVs=dRVs), distribution_skip=distribution_skip, **kwargs)
[docs] def apply_cc(self, cc, distribution_skip=False, **kwargs): """ Apply contrast-curve constraint to population. Only works if object has ``Rsky``, ``dmag`` attributes :param cc: Contrast curve. :type cc: :class:`ContrastCurveConstraint` :param distribution_skip: This is by default ``True``. *To be honest, I'm not exactly sure why. Might be important, might not (don't remember).* :param **kwargs: Additional keyword arguments passed to :func:`StarPopulation.apply_constraint`. """ rs = self.Rsky.to('arcsec').value dmags = self.dmag(cc.band) self.apply_constraint(ContrastCurveConstraint(rs,dmags,cc,name=cc.name), distribution_skip=distribution_skip, **kwargs)
[docs] def apply_vcc(self, vcc, distribution_skip=False, **kwargs): """ Applies "velocity contrast curve" to population. That is, the constraint that comes from not seeing two sets of spectral lines in a high resolution spectrum. Only works if population has ``dmag`` and ``RV`` attributes. :param vcc: Velocity contrast curve; dmag vs. delta-RV. :type cc: :class:`VelocityContrastCurveConstraint` :param distribution_skip: This is by default ``True``. *To be honest, I'm not exactly sure why. Might be important, might not (don't remember).* :param **kwargs: Additional keyword arguments passed to :func:`StarPopulation.apply_constraint`. """ rvs = self.RV.value dmags = self.dmag(vcc.band) self.apply_constraint(VelocityContrastCurveConstraint(rvs,dmags,vcc, name='secondary spectrum'), distribution_skip=distribution_skip, **kwargs)
[docs] def set_maxrad(self,maxrad, distribution_skip=True): """ Adds a constraint that rejects everything with Rsky > maxrad Requires ``Rsky`` attribute, which should always have units. :param maxrad: The maximum angular value of Rsky. :type maxrad: :class:`astropy.units.Quantity` :param distribution_skip: This is by default ``True``. *To be honest, I'm not exactly sure why. Might be important, might not (don't remember).* """ self.maxrad = maxrad self.apply_constraint(UpperLimit(self.Rsky,maxrad, name='Max Rsky'), overwrite=True, distribution_skip=distribution_skip)
#self._apply_all_constraints() @property def constraint_df(self): """ A DataFrame representing all constraints, hidden or not """ df = pd.DataFrame() for name,c in self.constraints.items(): df[name] = c.ok for name,c in self.hidden_constraints.items(): df[name] = c.ok return df @property def _properties(self): return ['name']
[docs] def save_hdf(self,filename,path='',properties=None, overwrite=False, append=False): """Saves to HDF5 file. Subclasses should be sure to define ``_properties`` attribute to ensure that all correct attributes get saved. Load a saved population with :func:`StarPopulation.load_hdf`. Example usage:: >>> from vespa.stars import Raghavan_BinaryPopulation, StarPopulation >>> pop = Raghavan_BinaryPopulation(1., n=1000) >>> pop.save_hdf('test.h5') >>> pop2 = StarPopulation.load_hdf('test.h5') >>> pop == pop2 True >>> pop3 = Ragahavan_BinaryPopulation.load_hdf('test.h5') >>> pop3 == pop2 True :param filename: Name of HDF file. :param path: (optional) Path within HDF file to save object. :param properties: (optional) Names of any properties (in addition to those defined in ``_properties`` attribute) that you wish to save. (This is an old keyword, and should probably be removed. Feel free to ignore it.) :param overwrite: (optional) Whether to overwrite file if it already exists. If ``True``, then any existing file will be deleted before object is saved. Use ``append`` if you don't wish this to happen. :param append: (optional) If ``True``, then if the file exists, then only the particular path in the file will get written/overwritten. If ``False`` and both file and path exist, then an ``IOError`` will be raised. If ``False`` and file exists but not path, then no error will be raised. """ if os.path.exists(filename): with pd.HDFStore(filename) as store: if path in store: if overwrite: os.remove(filename) elif not append: raise IOError('{} in {} exists. '.format(path,filename) + 'Set either overwrite or append option.') if properties is None: properties = {} for prop in self._properties: properties[prop] = getattr(self, prop) self.stars.to_hdf(filename,'{}/stars'.format(path)) self.constraint_df.to_hdf(filename,'{}/constraints'.format(path)) if self.orbpop is not None: self.orbpop.save_hdf(filename, path=path+'/orbpop') with pd.HDFStore(filename) as store: attrs = store.get_storer('{}/stars'.format(path)).attrs attrs.selectfrac_skip = self.selectfrac_skip attrs.distribution_skip = self.distribution_skip attrs.name = self.name attrs.poptype = type(self) attrs.properties = properties
[docs] @classmethod def load_hdf(cls, filename, path=''): """Loads StarPopulation from .h5 file Correct properties should be restored to object, and object will be original type that was saved. Complement to :func:`StarPopulation.save_hdf`. Example usage:: >>> from vespa.stars import Raghavan_BinaryPopulation, StarPopulation >>> pop = Raghavan_BinaryPopulation(1., n=1000) >>> pop.save_hdf('test.h5') >>> pop2 = StarPopulation.load_hdf('test.h5') >>> pop == pop2 True >>> pop3 = Ragahavan_BinaryPopulation.load_hdf('test.h5') >>> pop3 == pop2 True :param filename: HDF file with saved :class:`StarPopulation`. :param path: Path within HDF file. :return: :class:`StarPopulation` or appropriate subclass; whatever was saved with :func:`StarPopulation.save_hdf`. """ stars = pd.read_hdf(filename,path+'/stars') constraint_df = pd.read_hdf(filename,path+'/constraints') with pd.HDFStore(filename) as store: has_orbpop = '{}/orbpop/df'.format(path) in store has_triple_orbpop = '{}/orbpop/long/df'.format(path) in store attrs = store.get_storer('{}/stars'.format(path)).attrs poptype = attrs.poptype new = poptype() #if poptype != type(self): # raise TypeError('Saved population is {}. Please instantiate proper class before loading.'.format(poptype)) distribution_skip = attrs.distribution_skip selectfrac_skip = attrs.selectfrac_skip name = attrs.name for kw,val in attrs.properties.items(): setattr(new, kw, val) #load orbpop if there orbpop = None if has_orbpop: orbpop = OrbitPopulation.load_hdf(filename, path=path+'/orbpop') elif has_triple_orbpop: orbpop = TripleOrbitPopulation.load_hdf(filename, path=path+'/orbpop') new.stars = stars new.orbpop = orbpop for n in constraint_df.columns: mask = np.array(constraint_df[n]) c = Constraint(mask,name=n) sel_skip = n in selectfrac_skip dist_skip = n in distribution_skip new.apply_constraint(c,selectfrac_skip=sel_skip, distribution_skip=dist_skip) return new
[docs]class BinaryPopulation(StarPopulation): """A population of binary stars. If :class:`vespa.orbits.OrbitPopulation` provided via ``orbpop`` keyword, that will describe the orbits; if not, then orbit population will be generated. Single stars may be indicated if desired by having their mass set to zero and all magnitudes set to ``inf``. This will usually be used via, e.g., the :class:`Raghavan_BinaryPopulation` subclass, rather than instantiated directly. :param primary,secondary: (:class:`pandas.DataFrame`) Properties of primary and secondary stars, respectively. These get merged into new ``stars`` attribute, with "_A" and "_B" tags. :param orbpop: (:class:`vespa.orbits.OrbitPopulation`, optional) Object describing orbits of stars. If not provided, then ``period`` and ``ecc`` keywords must be provided, or else they will be randomly generated (see below). :param period,ecc: Periods and eccentricities of orbits. If ``orbpop`` not passed, and these are not provided, then periods and eccs will be randomly generated according to the empirical distributions of the Raghavan (2010) and Multiple Star Catalog distributions using :func:`utils.draw_raghavan_periods` and :func:`utils.draw_eccs`. """ def __init__(self, stars=None, primary=None,secondary=None, orbpop=None, period=None, ecc=None, is_single=None, **kwargs): if stars is None and primary is not None: assert len(primary)==len(secondary) stars = pd.DataFrame() for c in primary.columns: if re.search('_mag',c): stars[c] = addmags(primary[c],secondary[c]) stars['{}_A'.format(c)] = primary[c] for c in secondary.columns: stars['{}_B'.format(c)] = secondary[c] stars['q'] = stars['mass_B']/stars['mass_A'] if orbpop is None: if period is None: period = draw_raghavan_periods(len(secondary)) if ecc is None: ecc = draw_eccs(len(secondary),period) orbpop = OrbitPopulation(primary['mass'], secondary['mass'], period,ecc) StarPopulation.__init__(self,stars=stars,orbpop=orbpop,**kwargs) @property def singles(self): """ Subset of stars that are single. """ return self.stars.query('mass_B == 0') @property def binaries(self): """ Subset of stars that are binaries. """ return self.stars.query('mass_B > 0')
[docs] def binary_fraction(self,query='mass_A >= 0'): """ Binary fraction of stars passing given query :param query: Query to pass to stars ``DataFrame``. """ subdf = self.stars.query(query) nbinaries = (subdf['mass_B'] > 0).sum() frac = nbinaries/len(subdf) return frac, frac/np.sqrt(nbinaries)
@property def Plong(self): """ Orbital period. Called "Plong" to be consistent with hierarchical populations that have this attribute mean the longer of two periods. """ return self.orbpop.P
[docs] def dmag(self,band): """ Difference in magnitude between primary and secondary stars :param band: Photometric bandpass. """ mag2 = self.stars['{}_mag_B'.format(band)] mag1 = self.stars['{}_mag_A'.format(band)] return mag2-mag1
[docs] def rsky_distribution(self,rmax=None,smooth=0.1,nbins=100): """ Distribution of projected separations Returns a :class:`simpledists.Hist_Distribution` object. :param rmax: (optional) Maximum radius to calculate distribution. :param dr: (optional) Bin width for histogram :param smooth: (optional) Smoothing parameter for :class:`simpledists.Hist_Distribution` :param nbins: (optional) Number of bins for histogram :return: :class:`simpledists.Hist_Distribution` describing Rsky distribution """ if rmax is None: if hasattr(self,'maxrad'): rmax = self.maxrad else: rmax = np.percentile(self.Rsky,99) dist = dists.Hist_Distribution(self.Rsky.value,bins=nbins,maxval=rmax,smooth=smooth) return dist
[docs] def rsky_lhood(self,rsky,**kwargs): """ Evaluates Rsky likelihood at provided position(s) :param rsky: position :param **kwargs: Keyword arguments passed to :func:`BinaryPopulation.rsky_distribution` """ dist = self.rsky_distribution(**kwargs) return dist(rsky)
[docs]class Simulated_BinaryPopulation(BinaryPopulation): """Simulates BinaryPopulation according to provide primary mass(es), generating functions, and stellar isochrone models. :param M: Primary mass(es). :type M: ``float`` or array-like :param q_fn: (optional) Mass ratio generating function. Must return 'n' mass ratios, and be called as follows:: qs = q_fn(n) :type q_fn: Callable function. :param P_fn: (optional) Orbital period generating function. Must return ``n`` orbital periods, and be called as follows:: Ps = P_fn(n) :type P_fn: Callable function. :param ecc_fn: (optional) Orbital eccentricity generating function. Must return ``n`` orbital eccentricities generated according to provided period(s):: eccs = ecc_fn(n,Ps) :type ecc_fn: Callable function. :param n: (optional) Number of instances to simulate. :param ichrone: (optional) Stellar model object from which to simulate stellar properties. Default is the default Dartmouth isochrone. :type ichrone: :class:`isochrones.Isochrone` :param bands: (optional) Photometric bands to simulate via ``ichrone``. :param age,feh: (optional) log(age) and metallicity at which to simulate population. Can be ``float`` or array-like :param minmass: (optional) Minimum mass to simulate. Default = 0.12. """ def __init__(self,M=None,q_fn=None,P_fn=None,ecc_fn=None, n=1e4,ichrone='mist', qmin=0.1, bands=BANDS, age=9.6,feh=0.0, minmass=0.12, **kwargs): if q_fn is None: q_fn = flat_massratio self.q_fn = q_fn self.qmin = qmin self.P_fn = P_fn self.ecc_fn = ecc_fn self.minmass = minmass if M is None: BinaryPopulation.__init__(self) #empty else: self.generate(M, age=age, feh=feh, ichrone=ichrone, n=n, bands=bands, **kwargs)
[docs] def generate(self, M, age=9.6, feh=0.0, ichrone='mist', n=1e4, bands=None, **kwargs): """ Function that generates population. Called by ``__init__`` if ``M`` is passed. """ ichrone = get_ichrone(ichrone, bands=bands) if np.size(M) > 1: n = np.size(M) else: n = int(n) M2 = M * self.q_fn(n, qmin=np.maximum(self.qmin,self.minmass/M)) P = self.P_fn(n) ecc = self.ecc_fn(n,P) mass = np.ascontiguousarray(np.ones(n)*M) mass2 = np.ascontiguousarray(M2) age = np.ascontiguousarray(age) feh = np.ascontiguousarray(feh) pri = ichrone(mass, age, feh, return_df=True, bands=bands) sec = ichrone(mass2, age, feh, return_df=True, bands=bands) BinaryPopulation.__init__(self, primary=pri, secondary=sec, period=P, ecc=ecc, **kwargs) return self
@property def _properties(self): return ['q_fn', 'qmin', 'P_fn', 'ecc_fn', 'minmass'] +\ super(Simulated_BinaryPopulation, self)._properties
[docs]class Raghavan_BinaryPopulation(Simulated_BinaryPopulation): """A Simulated_BinaryPopulation with empirical default distributions. Default mass ratio distribution is flat down to chosen minimum mass, default period distribution is from Raghavan (2010), default eccentricity/period relation comes from data from the Multiple Star Catalog (Tokovinin, xxxx). :param M: Primary mass(es) in solar masses. :param e_M: (optional) 1-sigma uncertainty in primary mass. :param n: (optional) Number of simulated instances to create. :param ichrone: (optional) Stellar models from which to generate binary companions. :type ichrone: :class:`isochrones.Isochrone` :param age,feh: (optional) Age and metallicity of system. :param name: (optional) Name of population. :param q_fn: (optional) A function that returns random mass ratios. Defaults to flat down to provided minimum mass. Must be able to be called as follows:: qs = q_fn(n, qmin, qmax) to provide ``n`` random mass ratios. """ def __init__(self,M=None,e_M=0,n=1e4,ichrone='mist', age=9.5, feh=0.0, q_fn=None, qmin=0.1, minmass=0.12, **kwargs): if M is not None: if q_fn is None: q_fn = flat_massratio if e_M != 0: M = stats.norm(M,e_M).rvs(n) Simulated_BinaryPopulation.__init__(self, M=M, q_fn=q_fn, P_fn=draw_raghavan_periods, ecc_fn=draw_eccs, n=n, qmin=qmin, ichrone=ichrone, age=age, feh=feh, minmass=minmass, **kwargs)
[docs]class TriplePopulation(StarPopulation): """A population of triple stars. (Primary) orbits (secondary + tertiary) in a long orbit; secondary and tertiary orbit each other with a shorter orbit. Single or double stars may be indicated if desired by having the masses of secondary or tertiary set to zero, and all magnitudes to ``inf``. :param stars: (optional) Full stars ``DataFrame``. If not passed, then primary, secondary, and tertiary must be. :param primary,secondary,tertiary: (optional) Properties of primary, secondary, and tertiary stars, in :class:`pandas.DataFrame` form. These will get merged into a new ``stars`` attribute, with "_A", "_B", and "_C" tags. :param orbpop: (optional) Object describing orbits of stars. If not provided, then the period and eccentricity keywords must be provided, or else they will be randomly generated (see below). :type orbpop: :class:`TripleOrbitPopulation` :param period_short,period_long,ecc_short,ecc_long: (array-like, optional) Orbital periods and eccentricities of short and long-period orbits. "Short" describes the close pair of the hierarchical system; "long" describes the separation between the two major components. Randomly generated if not provided. """ def __init__(self, stars=None, primary=None, secondary=None, tertiary=None, orbpop=None, period_short=None, period_long=None, ecc_short=0, ecc_long=0, **kwargs): if stars is None and primary is not None: assert len(primary)==len(secondary) and len(primary)==len(tertiary) N = len(primary) stars = pd.DataFrame() for c in primary.columns: if re.search('_mag',c): stars[c] = addmags(primary[c],secondary[c],tertiary[c]) stars['{}_A'.format(c)] = primary[c] for c in secondary.columns: stars['{}_B'.format(c)] = secondary[c] for c in tertiary.columns: stars['{}_C'.format(c)] = tertiary[c] if orbpop is None: if period_long is None or period_short is None: period_1 = draw_raghavan_periods(N) period_2 = draw_msc_periods(N) period_short = np.minimum(period_1, period_2) period_long = np.maximum(period_1, period_2) if ecc_short is None: ecc_short = draw_eccs(N,period_short) if ecc_long is None: ecc_long = draw_eccs(N,period_long) M1 = stars['mass_A'] M2 = stars['mass_B'] M3 = stars['mass_C'] orbpop = TripleOrbitPopulation(M1,M2,M3,period_long,period_short, ecclong=ecc_long, eccshort=ecc_short) StarPopulation.__init__(self, stars=stars, orbpop=orbpop, **kwargs)
[docs] def dmag(self, band): """ Difference in magnitudes between fainter and brighter components in band. :param band: Photometric bandpass. """ m1 = self.stars['{}_mag_A'.format(band)] m2 = addmags(self.stars['{}_mag_B'.format(band)], self.stars['{}_mag_C'.format(band)]) return np.abs(m2-m1)
[docs] def A_brighter(self, band='g'): """ Instances where star A is brighter than (B+C) """ mA = self.stars['{}_mag_A'.format(band)] mBC = addmags(self.stars['{}_mag_B'.format(band)], self.stars['{}_mag_C'.format(band)]) return mA < mBC
[docs] def BC_brighter(self, band='g'): """ Instances where stars (B+C) are brighter than star A """ return ~self.A_brighter(band=band)
[docs] def dRV(self, dt, band='g'): """Returns dRV of star A, if A is brighter than B+C, or of star B if B+C is brighter """ return (self.orbpop.dRV_1(dt)*self.A_brighter(band) + self.orbpop.dRV_2(dt)*self.BC_brighter(band))
@property def Plong(self): """ Longer of two orbital periods in Triple system """ return self.orbpop.orbpop_long.P @property def singles(self): return self.stars.query('mass_B==0 and mass_C==0') @property def binaries(self): return self.stars.query('mass_B > 0 and mass_C==0') @property def triples(self): return self.stars.query('mass_B > 0 and mass_C > 0')
[docs] def binary_fraction(self,query='mass_A > 0', unc=False): """ Binary fraction of stars following given query """ subdf = self.stars.query(query) nbinaries = ((subdf['mass_B'] > 0) & (subdf['mass_C']==0)).sum() frac = nbinaries/len(subdf) if unc: return frac, frac/np.sqrt(nbinaries) else: return frac
[docs] def triple_fraction(self,query='mass_A > 0', unc=False): """ Triple fraction of stars following given query """ subdf = self.stars.query(query) ntriples = ((subdf['mass_B'] > 0) & (subdf['mass_C'] > 0)).sum() frac = ntriples/len(subdf) if unc: return frac, frac/np.sqrt(ntriples) else: return frac
[docs]class Observed_BinaryPopulation(BinaryPopulation): """ A population of binary stars matching observed constraints. :param mags: Observed apparent magnitudes :type mags: ``dict`` :param Teff,logg,feh: Observed spectroscopic properties of primary star, if available. Format: ``(value, err)``. :param starmodel: :class:`isochrones.BinaryStarModel`. If not passed, it will be generated. """ def __init__(self, mags=None, mag_errs=None, Teff=None, logg=None, feh=None, starmodel=None, n=2e4, ichrone='mist', bands=BANDS, period=None, ecc=None, orbpop=None, stars=None, **kwargs): self.mags = mags self.mag_errs = mag_errs self.Teff = Teff self.logg = logg self.feh = feh self._starmodel = starmodel if stars is None and mags is not None \ or starmodel is not None: self.generate(mags=mags, mag_errs=mag_errs, n=n, ichrone=ichrone, starmodel=starmodel, Teff=Teff, logg=logg, feh=feh, bands=bands, orbpop=orbpop, period=period, ecc=ecc, **kwargs) else: self.stars = stars self.orbpop = orbpop @property def starmodel_props(self): """Default mag_err is 0.05, arbitrarily """ props = {} mags = self.mags mag_errs = self.mag_errs for b in mags.keys(): if np.size(mags[b])==2: props[b] = mags[b] elif np.size(mags[b])==1: mag = mags[b] try: e_mag = mag_errs[b] except: e_mag = 0.05 props[b] = (mag, e_mag) if self.Teff is not None: props['Teff'] = self.Teff if self.logg is not None: props['logg'] = self.logg if self.feh is not None: props['feh'] = self.feh return props
[docs] def generate(self, mags=None, mag_errs=None, n=1e4, ichrone='mist', starmodel=None, Teff=None, logg=None, feh=None, bands=BANDS, orbpop=None, period=None, ecc=None, **kwargs): ichrone = get_ichrone(ichrone, bands=bands) if starmodel is None: params = self.starmodel_props logging.info('Fitting BinaryStarModel to {}...'.format(params)) starmodel = BinaryStarModel(ichrone, **params) starmodel.fit() logging.info('BinaryStarModel fit Done.') # if type(starmodel) != BinaryStarModel: # raise TypeError('starmodel must be BinaryStarModel.') self._starmodel = starmodel samples = starmodel.random_samples(n) age, feh = (np.ascontiguousarray(samples['age_0']), np.ascontiguousarray(samples['feh_0'])) dist, AV = (samples['distance_0'], samples['AV_0']) mass_A, mass_B = (np.ascontiguousarray(samples['mass_0_0']), np.ascontiguousarray(samples['mass_0_1'])) primary = ichrone(mass_A, age, feh, distance=dist, AV=AV, bands=BANDS) secondary = ichrone(mass_B, age, feh, distance=dist, AV=AV, bands=BANDS) BinaryPopulation.__init__(self, primary=primary, secondary=secondary, orbpop=orbpop, period=period, ecc=ecc, **kwargs)
[docs] def save_hdf(self, filename, path='', **kwargs): super(Observed_BinaryPopulation,self).save_hdf(filename, path=path, **kwargs) self.starmodel.save_hdf(filename, path='{}/starmodel'.format(path), append=True)
[docs] @classmethod def load_hdf(cls, filename, path=''): pop = super(Observed_BinaryPopulation, cls).load_hdf(filename, path=path) pop._starmodel = BinaryStarModel.load_hdf(filename, path='{}/starmodel'.format(path)) return pop
# def __getattr__(self, attr): # # Don't remember why I've done this. Must be a reason. # if attr not in ['starmodel','_starmodel']: # return getattr(self.starmodel, attr)
[docs]class Observed_TriplePopulation(TriplePopulation): """ A population of triple stars matching observed constraints. :param mags: Observed apparent magnitudes :type mags: ``dict`` :param Teff,logg,feh: Observed spectroscopic properties of primary star, if available. Format: ``(value, err)``. :param starmodel: :class:`isochrones.TripleStarModel`. If not passed, it will be generated. """ def __init__(self, mags=None, mag_errs=None, Teff=None, logg=None, feh=None, starmodel=None, n=2e4, ichrone='mist', bands=BANDS, period=None, ecc=None, orbpop=None, stars=None, **kwargs): self.mags = mags self.mag_errs = mag_errs self.Teff = Teff self.logg = logg self.feh = feh self._starmodel = starmodel if stars is None and mags is not None \ or starmodel is not None: self.generate(mags=mags, mag_errs=mag_errs, n=n, ichrone=ichrone, starmodel=starmodel, Teff=Teff, logg=logg, feh=feh, bands=bands, orbpop=orbpop, period=period, ecc=ecc, **kwargs) else: self.stars = stars self.orbpop = orbpop @property def starmodel_props(self): """Default mag_err is 0.05, arbitrarily """ props = {} mags = self.mags mag_errs = self.mag_errs for b in mags.keys(): if np.size(mags[b])==2: props[b] = mags[b] elif np.size(mags[b])==1: mag = mags[b] try: e_mag = mag_errs[b] except: e_mag = 0.05 props[b] = (mag, e_mag) if self.Teff is not None: props['Teff'] = self.Teff if self.logg is not None: props['logg'] = self.logg if self.feh is not None: props['feh'] = self.feh return props
[docs] def generate(self, mags=None, mag_errs=None, n=1e4, ichrone='mist', starmodel=None, Teff=None, logg=None, feh=None, bands=BANDS, orbpop=None, period=None, ecc=None, **kwargs): ichrone = get_ichrone(ichrone, bands=bands) if starmodel is None: params = self.starmodel_props logging.info('Fitting TripleStarModel to {}...'.format(params)) starmodel = TripleStarModel(ichrone, **params) starmodel.fit() logging.info('TripleStarModel fit Done.') # if type(starmodel) != TripleStarModel: # raise TypeError('starmodel must be TripleStarModel.') self._starmodel = starmodel samples = starmodel.random_samples(n) age, feh = (np.ascontiguousarray(samples['age_0']), np.ascontiguousarray(samples['feh_0'])) dist, AV = (samples['distance_0'], samples['AV_0']) mass_A, mass_B, mass_C = (np.ascontiguousarray(samples['mass_0_0']), np.ascontiguousarray(samples['mass_0_1']), np.ascontiguousarray(samples['mass_0_2'])) primary = ichrone(mass_A, age, feh, distance=dist, AV=AV, bands=BANDS) secondary = ichrone(mass_B, age, feh, distance=dist, AV=AV, bands=BANDS) tertiary = ichrone(mass_C, age, feh, distance=dist, AV=AV, bands=BANDS) TriplePopulation.__init__(self, primary=primary, secondary=secondary, tertiary=tertiary, orbpop=orbpop, period_short=period, ecc_short=ecc, **kwargs)
[docs] def save_hdf(self, filename, path='', **kwargs): super(Observed_TriplePopulation,self).save_hdf(filename, path=path, **kwargs) self.starmodel.save_hdf(filename, path='{}/starmodel'.format(path), append=True)
[docs] @classmethod def load_hdf(cls, filename, path=''): pop = super(Observed_TriplePopulation, cls).load_hdf(filename, path=path) pop._starmodel = TripleStarModel.load_hdf(filename, path='{}/starmodel'.format(path)) return pop
# def __getattr__(self, attr): # # Why did I do this again? Probably a reason... # if attr not in ['starmodel', '_starmodel']: # return getattr(self.starmodel, attr) class MultipleStarPopulation(TriplePopulation): """A population of single, double, and triple stars, generated according to prescription. :param mA: (optional) Mass of primary star(s). Default=1. If array, then the simulation will be lots of individual systems; if float, then the simulation will be lots of realizations of one system. :param age,feh: (optional) Age, feh of system(s). :param f_binary,f_triple: (optional) Fraction of systems that should be binaries or triples. Should have ``f_binary + f_triple < 1``, though if ``f_binary + f_triple >= 1``, then ``f_binary`` will implicitly be treated as ``1 - f_triple``. :param qmin: (optional) Minimum mass ratio. :param minmass: (optional) Minimum stellar mass to simulate. :param n: (optional) Size of simulation (if ``mA`` is a scalar). If ``mA`` is array-like, then ``n = len(mA)``. :param ichrone: (:class:`isochrones.Isochrone`, optional) Stellar model isochrone to generate simulations. Defaults to Dartmouth model grid. :param bands: (optional) Photometry bandpasses to simulate using ``ichrone``. :param multmass_fn,period_long_fn,period_short_fn,ecc_fn: (optional) Functions to generate masses, orbital periods, and eccentricities. Defaults built in. See :class`TriplePopulation`. :param orbpop: (optional) Object describing orbits of stars. If not provided, orbits will be randomly generated according to generating functions. :type orbpop: :class:`orbits.TripleOrbitPopulation` Additional keyword arguments passed to :class:`TriplePopulation`. """ def __init__(self, mA=None, age=9.6, feh=0.0, f_binary=0.4, f_triple=0.12, qmin=0.1, minmass=0.11, n=1e4, ichrone='mist', multmass_fn=mult_masses, period=None, period_long_fn=draw_raghavan_periods, period_short_fn=draw_msc_periods, period_short=None, period_long=None, ecc_fn=draw_eccs, ecc_kws=None, bands=BANDS, orbpop=None, stars=None, **kwargs): #These get set even if stars is passed self.f_binary = f_binary self.f_triple = f_triple self.qmin = qmin self.minmass = minmass self.multmass_fn = multmass_fn self.period_long_fn = period_long_fn self.period_short_fn = period_short_fn if period_long is not None: self.period_long_fn = None if period_short is not None: self.period_short_fn = None self.ecc_fn = ecc_fn if stars is None and mA is not None: self.generate(mA=mA, age=age, feh=feh, n=n, ichrone=ichrone, orbpop=orbpop, bands=bands, period_long=period_long, period_short=period_short, **kwargs) else: TriplePopulation.__init__(self, stars=stars, orbpop=orbpop, **kwargs) def generate(self, mA=1, age=9.6, feh=0.0, n=1e5, ichrone='mist', orbpop=None, bands=None, **kwargs): """ Generates population. Called if :class:`MultipleStarPopulation` is initialized without providing ``stars``, and if ``mA`` is provided. """ ichrone = get_ichrone(ichrone, bands=bands) n = int(n) #star with m1 orbits (m2+m3). So mA (most massive) # will correspond to either m1 or m2. m1, m2, m3 = self.multmass_fn(mA, f_binary=self.f_binary, f_triple=self.f_triple, qmin=self.qmin, minmass=self.minmass, n=n) #reset n if need be n = len(m1) feh = np.ascontiguousarray(np.atleast_1d(feh)) age = np.ascontiguousarray(age) #generate stellar properties primary = ichrone(np.ascontiguousarray(m1), age, feh, bands=bands) secondary = ichrone(np.ascontiguousarray(m2),age,feh, bands=bands) tertiary = ichrone(np.ascontiguousarray(m3),age,feh, bands=bands) #clean up columns that become nan when called with mass=0 # Remember, we want mass=0 and mags=inf when something doesn't exist no_secondary = (m2==0) no_tertiary = (m3==0) for c in secondary.columns: # if re.search('_mag',c): secondary[c][no_secondary] = np.inf tertiary[c][no_tertiary] = np.inf secondary['mass'][no_secondary] = 0 tertiary['mass'][no_tertiary] = 0 if kwargs['period_short'] is None: if kwargs['period_long'] is None: period_1 = self.period_long_fn(n) period_2 = self.period_short_fn(n) kwargs['period_short'] = np.minimum(period_1, period_2) kwargs['period_long'] = np.maximum(period_1, period_2) else: kwargs['period_short'] = self.period_short_fn(n) #correct any short periods that are longer than period_long bad = kwargs['period_short'] > kwargs['period_long'] n_bad = bad.sum() good_inds = np.where(~bad)[0] inds = np.random.randint(len(good_inds),size=n_bad) kwargs['period_short'][bad] = \ kwargs['period_short'][good_inds[inds]] else: if kwargs['period_long'] is None: kwargs['period_long'] = self.period_long_fn(n) #correct any long periods that are shorter than period_short bad = kwargs['period_long'] < kwargs['period_short'] n_bad = bad.sum() good_inds = np.where(~bad)[0] inds = np.random.randint(len(good_inds),size=n_bad) kwargs['period_long'][bad] = \ kwargs['period_long'][good_inds[inds]] if 'ecc_short' not in kwargs: kwargs['ecc_short'] = self.ecc_fn(n, kwargs['period_short']) if 'ecc_long' not in kwargs: kwargs['ecc_long'] = self.ecc_fn(n, kwargs['period_long']) TriplePopulation.__init__(self, primary=primary, secondary=secondary, tertiary=tertiary, orbpop=orbpop, **kwargs) return self @property def _properties(self): return ['f_binary', 'f_triple', 'qmin', 'minmass', 'period_long_fn', 'period_short_fn', 'ecc_fn'] + super(MultipleStarPopulation, self)._properties
[docs]class BGStarPopulation(StarPopulation): """Background star population This should usually be accessed via the :class:`BGStarPopulation_TRILEGAL` subclass. :param stars: (:class:`pandas.DataFrame`, optional) Properties of stars. Must have 'distance' column defined. :param mags: (optional) Magnitudes of primary (foreground) stars. :param maxrad: (optional) Maximum distance (arcseconds) of BG stars from foreground primary star. :param density: (optional) Density in arcsec^{-2} for BG star population. :param **kwargs: Additional keyword arguments passed to :class:`StarPopulation`. """ def __init__(self,stars=None,mags=None,maxrad=1800,density=None, **kwargs): self.mags = mags if stars is not None: if 'distance' not in stars: raise ValueError('Stars must have distance column defined') if density is None: self.density = len(stars)/((3600.*u.arcsec)**2) #default is for TRILEGAL sims to be 1deg^2 else: if type(density)!=Quantity: raise ValueError('Provided stellar density must have units.') self.density = density if type(maxrad) != Quantity: self._maxrad = maxrad*u.arcsec #arcsec else: self._maxrad = maxrad StarPopulation.__init__(self,stars=stars, **kwargs) if stars is not None: self.stars['Rsky'] = randpos_in_circle(len(stars),maxrad,return_rad=True) @property def Rsky(self): """ Project on-sky separation between primary star and BG stars """ return np.array(self.stars['Rsky'])*u.arcsec @property def maxrad(self): return self._maxrad @maxrad.setter def maxrad(self,value): if type(value) != Quantity: value = value*u.arcsec self.stars['Rsky'] *= (value/self._maxrad).decompose() self._maxrad = value #look for contrast curve constraints & re-apply them cc_names = [] cc_list = [] for k,c in self.constraints.items(): if isinstance(c, ContrastCurveConstraint): cc_names.append(k) cc_list.append(c.cc) for name,cc in zip(cc_names, cc_list): self.remove_constraint(name) self.apply_cc(cc) logging.warning('maxrad changed for {} population; {} contrast curve re-applied'.format(self.name, cc.name))
[docs] def dmag(self,band): """ Magnitude difference between primary star and BG stars """ if self.mags is None: raise ValueError('dmag is not defined because primary mags are not defined for this population.') return self.stars['{}_mag'.format(band)] - self.mags[band]
@property def _properties(self): return ['mags', '_maxrad', 'density'] + \ super(BGStarPopulation, self)._properties
[docs]class BGStarPopulation_TRILEGAL(BGStarPopulation): """Creates TRILEGAL simulation for ra,dec; loads as BGStarPopulation :param filename: Desired name of the TRILEGAL simulation. Can either have '.h5' extension or not. If filename (or 'filename.h5') exists locally, it will be loaded; otherwise, TRILEGAL will be called via the ``get_trilegal`` perl script, and the file will be generated. :param ra,dec: (optional) Sky coordinates of TRILEGAL simulation. Must be passed if generating TRILEGAL simulation and not just reading from existing file. :param mags: (optional) Dictionary of primary star magnitudes (if this is being used to generate a background population behind a particular foreground star). This must be set in order to use the ``dmag`` attribute. :type mags: (optional) ``dict`` :param maxrad: (optional) Maximum distance (arcsec) out to which to place simulated stars. :param **kwargs: Additional keyword arguments passed to :func:`stars.trilegal.get_trilegal` """ def __init__(self,filename=None,ra=None,dec=None,mags=None,maxrad=1800, **kwargs): self.trilegal_args = {} if filename is None: BGStarPopulation.__init__(self) else: m = re.search('(.*)\.h5$',filename) if not m: h5filename = '{}.h5'.format(filename) basefilename = filename else: h5filename = filename basefilename = m.group(1) if os.path.exists(h5filename): logging.info('Loading TRILEGAL simulation from {}'.format(h5filename)) stars = pd.read_hdf(h5filename,'df') else: if ra is None or dec is None: raise ValueError('Must provide ra,dec if simulation file does not already exist.') logging.info('Getting TRILEGAL simulation at {}, {}...'.format(ra,dec)) get_trilegal(basefilename,ra,dec,**kwargs) logging.info('Done.') stars = pd.read_hdf(h5filename,'df') with pd.HDFStore(h5filename) as store: self.trilegal_args = store.get_storer('df').attrs.trilegal_args c = SkyCoord(self.trilegal_args['l'],self.trilegal_args['b'], unit='deg',frame='galactic') self.coords = c.icrs area = self.trilegal_args['area']*(u.deg)**2 density = len(stars)/area stars['distmod'] = stars['m-M0'] stars['distance'] = dfromdm(stars['distmod']) BGStarPopulation.__init__(self,stars,mags=mags,maxrad=maxrad, density=density,**kwargs) @property def _properties(self): return ['trilegal_args'] + \ super(BGStarPopulation_TRILEGAL,self)._properties
############## Exceptions ################ class PoorColorsError(Exception): pass #methods below should be applied to relevant subclasses ''' def set_dmaglim(self,dmaglim): if not (hasattr(self,'blendmag') and hasattr(self,'dmaglim')): return self.dmaglim = dmaglim self.apply_constraint(LowerLimit(self.dmags(),self.dmaglim,name='bright blend limit'),overwrite=True) self._apply_all_constraints() #not necessary? '''