from __future__ import division,print_function
import sys,re,os
import logging
on_rtd = os.environ.get('READTHEDOCS') == 'True'
if not on_rtd:
import numpy as np
from astropy.coordinates import SkyCoord,Angle
import numpy.random as rand
import matplotlib.pyplot as plt
import pandas as pd
from astropy import units as u
from astropy.units.quantity import Quantity
from astropy import constants as const
MSUN = const.M_sun.cgs.value
AU = const.au.cgs.value
DAY = 86400
G = const.G.cgs.value
else:
np = None
SkyCoord, Angle = (None, None)
rand = None
plt = None
pd = None
u = None
Quantity = None
const = None
MSUN, AU, DAY, G = (None, None, None, None)
from .utils import semimajor,random_spherepos,orbitproject,orbit_posvel
from ..plotutils import setfig
from ..hashutils import hashcombine, hashdf
[docs]class TripleOrbitPopulation(object):
"""
Stars 2 and 3 orbit each other (short orbit), far from star 1 (long orbit)
This object defines the orbits of a triple star system,
with orbits calculated assuming the "long" orbit does not perturb
the "short" orbit, which will not be true in the long run, but should
be true over short timescales as long as ``Plong >> Pshort``.
A :class:`TripleOrbitPopulation` is essentially made up of two
:class:`OrbitPopulation` objects: one for the "long" orbit
and one for the "short."
:param M1,M2,M3:
Masses of stars. Stars 2 and 3 are in a short orbit, far away from star 1.
If not :class:`astropy.units.Quantity` objects, then assumed to be
in solar mass units. May be single value or array-like.
:param Plong,Pshort:
Orbital Periods. Plong is orbital period of 2+3 and 1; Pshort is orbital
period of 2 and 3. If not :class:`astropy.units.Quantity` objects,
assumed to be in days. Can be single value or array-like.
N.B. If any item in Pshort happens to be
longer than the corresponding item in Plong, they will be switched.
:param ecclong,eccshort: (optional)
Eccentricities. Same story (long vs. short). Default=0 (circular).
Can be single value or array-like.
:param n: (optional)
Number of systems to simulate (if ``M1``, ``M2``, ``M3`` aren't
arrays of size > 1 already).
:param mean_anomaly_short,mean_anomaly_long: (optional)
Mean anomalies. This is only passed if you need to restore a
particular specific configuration (i.e., a particular saved simulation),
e.g., as done by :func:`TripleOrbitPopulation.from_df`.
If not provided, then randomized on (0, 2pi).
:param obsx_short,obsy_short,obsz_short: (optional)
"Observer" positions for the short orbit. Also only passed for purposes
of restoring configuration.
:param obsx_long,obsy_long,obsz_long: (optional)
"Observer" positions for long orbit. Also only passed for purposes of
restoring configuration.
:param obspos_short,obspos_long: (optional)
"Observer positions for short and long orbits, provided
as :class:`astropy.SkyCoord` objects. These will replace
obsx_short/long, obsy_short/long, obsz_short/long parameters if present.
"""
def __init__(self,M1,M2,M3,Plong,Pshort,ecclong=0,eccshort=0,n=None,
mean_anomaly_long=None,obsx_long=None,obsy_long=None,obsz_long=None,
obspos_long=None,
mean_anomaly_short=None,obsx_short=None,
obsy_short=None,obsz_short=None,
obspos_short=None):
Pshort, Plong = (np.minimum(Pshort,Plong), np.maximum(Pshort,Plong))
#if Plong < Pshort:
# Pshort,Plong = (Plong, Pshort)
self.orbpop_long = OrbitPopulation(M1,M2+M3,Plong,ecc=ecclong,n=n,
mean_anomaly=mean_anomaly_long,
obsx=obsx_long,obsy=obsy_long,obsz=obsz_long)
self.orbpop_short = OrbitPopulation(M2,M3,Pshort,ecc=eccshort,n=n,
mean_anomaly=mean_anomaly_short,
obsx=obsx_short,obsy=obsy_short,obsz=obsz_short)
def __hash__(self):
return hashcombine(self.orbpop_long, self.orbpop_short)
@property
def RV(self):
"""
Instantaneous RV of star 1 with respect to system center-of-mass
"""
return self.RV_1
@property
def RV_1(self):
"""
Instantaneous RV of star 1 with respect to system center-of-mass
"""
return self.orbpop_long.RV * (self.orbpop_long.M2 / (self.orbpop_long.M1 + self.orbpop_long.M2))
@property
def RV_2(self):
"""
Instantaneous RV of star 2 with respect to system center-of-mass
"""
return -self.orbpop_long.RV * (self.orbpop_long.M1 /
(self.orbpop_long.M1 + self.orbpop_long.M2)) +\
self.orbpop_short.RV_com1
@property
def RV_3(self):
"""
Instantaneous RV of star 3 with respect to system center-of-mass
"""
return -self.orbpop_long.RV * (self.orbpop_long.M1 / (self.orbpop_long.M1 + self.orbpop_long.M2)) +\
self.orbpop_short.RV_com2
@property
def Rsky(self):
"""
Projected separation of star 2+3 pair from star 1 [projected AU]
"""
return self.orbpop_long.Rsky
[docs] def dRV(self,dt):
"""
Returns difference in RVs (separated by time dt) of star 1.
:param dt:
Time separation for which to compute RV change. If not an
:class:`astropy.units.Quantity` object, then assumed to be in days.
"""
return self.dRV_1(dt)
[docs] def dRV_1(self,dt):
"""
Returns difference in RVs (separated by time dt) of star 1.
:param dt:
Time separation for which to compute RV change. If not an
:class:`astropy.units.Quantity` object, then assumed to be in days.
"""
return self.orbpop_long.dRV(dt,com=True)
[docs] def dRV_2(self,dt):
"""
Returns difference in RVs (separated by time dt) of star 2.
:param dt:
Time separation for which to compute RV change. If not an
:class:`astropy.units.Quantity` object, then assumed to be in days.
"""
return -self.orbpop_long.dRV(dt) * (self.orbpop_long.M1/(self.orbpop_long.M1 + self.orbpop_long.M2)) +\
self.orbpop_short.dRV(dt,com=True)
[docs] def dRV_3(self,dt):
"""
Returns difference in RVs (separated by time dt) of star 3.
:param dt:
Time separation for which to compute RV change. If not an
:class:`astropy.units.Quantity` object, then assumed to be in days.
"""
return -self.orbpop_long.dRV(dt) * (self.orbpop_long.M1/(self.orbpop_long.M1 + self.orbpop_long.M2)) -\
self.orbpop_short.dRV(dt) * (self.orbpop_short.M1/(self.orbpop_short.M1 + self.orbpop_short.M2))
[docs] def save_hdf(self,filename,path=''):
"""
Save to HDF5 file in desired path.
"""
self.orbpop_long.save_hdf(filename,'{}/long'.format(path))
self.orbpop_short.save_hdf(filename,'{}/short'.format(path))
def __add__(self, other):
if type(self) != type(other):
raise TypeError('Can only add like types of TripleOrbitPopulation')
newdf_long = pd.concat((self.orbpop_long.dataframe, other.orbpop_long.dataframe))
newdf_short = pd.concat((self.orbpop_short.dataframe, other.orbpop_short.dataframe))
return TripleOrbitPopulation_FromDF(newdf_long, newdf_short)
[docs] @classmethod
def from_df(cls, df_long, df_short):
"""
Builds TripleOrbitPopulation from DataFrame
``DataFrame`` objects must be of appropriate form to pass
to :func:`OrbitPopulation.from_df`.
:param df_long, df_short:
:class:`pandas.DataFrame` objects to pass to
:func:`OrbitPopulation.from_df`.
"""
pop = cls(1,1,1,1,1) #dummy population
pop.orbpop_long = OrbitPopulation.from_df(df_long)
pop.orbpop_short = OrbitPopulation.from_df(df_short)
return pop
[docs] @classmethod
def load_hdf(cls, filename, path=''):
"""
Load TripleOrbitPopulation from saved .h5 file.
:param filename:
HDF file name.
:param path:
Path within HDF file where data is stored.
"""
df_long = pd.read_hdf(filename,'{}/long/df'.format(path))
df_short = pd.read_hdf(filename,'{}/short/df'.format(path))
return cls.from_df(df_long, df_short)
[docs]class OrbitPopulation(object):
"""Population of orbits.
:param M1,M2:
Primary and secondary masses (if not ``Quantity``,
assumed to be in solar masses). Can be ``float``, array-like
or ``Quantity``.
:param P:
Orbital period(s) (if not ``Quantity``, assumed to be in days)
:type P:
``float``, array-like or ``Quantity``.
:param ecc: (``float`` or array-like, optional)
Eccentricities.
:param n: (optional)
Number of instances to simulate. If not provided, then this number
will be the length of ``M2`` (or ``P``) provided.
:param mean_anomaly: (optional)
Mean anomalies of orbits. Usually this will just be set randomly,
but can be provided to initialize a particular state (e.g., when
restoring an :class:`OrbitPopulation` object from a saved state).
:param obsx, obsy, obsz: (optional)
"Observer" positions to define coordinates. Will be set randomly
if not provided.
:param obspos: (optional)
"Observer" positions may be set with a ``SkyCoord`` object (replaces
obsx, obsy, obsz)
:type obspos:
:class:`astropy.coordinates.SkyCoord`
"""
def __init__(self,M1,M2,P,ecc=0,n=None,
mean_anomaly=None,obsx=None,obsy=None,obsz=None,
obspos=None):
if type(M1) != Quantity:
M1 = Quantity(M1, unit='M_sun')
if type(M2) != Quantity:
M2 = Quantity(M2, unit='M_sun')
if type(P) != Quantity:
P = Quantity(P, unit='day')
if n is None:
if M2.size==1:
n = P.size
else:
n = M2.size
self.M1 = M1
self.M2 = M2
self.N = n
self.P = P
if np.size(ecc) == 1:
ecc = np.ones(n)*ecc
self.ecc = ecc
mred = M1*M2/(M1+M2)
self.semimajor = semimajor(P,mred) #AU
self.mred = mred
if mean_anomaly is None:
M = rand.uniform(0,2*np.pi,size=n)
else:
M = mean_anomaly
self.M = M
#coordinates of random observers
if obspos is None:
if obsx is None:
self.obspos = random_spherepos(n)
else:
self.obspos = SkyCoord(obsx,obsy,obsz,representation='cartesian')
else:
self.obspos = obspos
#get positions, velocities relative to M1
position,velocity = orbit_posvel(self.M,self.ecc,self.semimajor.value,
self.mred.value,
self.obspos)
self.position = position
self.velocity = velocity
def __add__(self, other):
if type(self) != type(other):
raise TypeError('Can only add like types of OrbitPopulation')
newdf = pd.concat((self.dataframe, other.dataframe))
return OrbitPopulation_FromDF(newdf)
def __hash__(self):
return hashdf(self.dataframe)
@property
def Rsky(self):
"""
Sky separation of stars, in projected AU.
"""
return np.sqrt(self.position.x**2 + self.position.y**2)
@property
def RV(self):
"""
Relative radial velocities of two stars
"""
return self.velocity.z
@property
def RV_com1(self):
"""
RVs of star 1 relative to center-of-mass
"""
return self.RV * (self.M2 / (self.M1 + self.M2))
@property
def RV_com2(self):
"""
RVs of star 2 relative to center-of-mass
"""
return -self.RV * (self.M1 / (self.M1 + self.M2))
[docs] def dRV(self,dt,com=False):
"""Change in RV of star 1 for time separation dt (default=days)
:param dt:
Time separation for which to compute RV change. If not a ``Quantity``,
then assumed to be in days.
:type dt:
float, array-like, or ``Quantity``
:param com: (``bool``, optional)
If ``True``, then return dRV of star 1 in center-of-mass frame.
:return dRV:
Change in radial velocity over time ``dt``.
"""
if type(dt) != Quantity:
dt *= u.day
mean_motions = np.sqrt(G*(self.mred)*MSUN/(self.semimajor*AU)**3)
mean_motions = np.sqrt(const.G*(self.mred)/(self.semimajor)**3)
newM = self.M + mean_motions * dt
pos,vel = orbit_posvel(newM,self.ecc,self.semimajor.value,
self.mred.value,
self.obspos)
if com:
return (vel.z - self.RV) * (self.M2 / (self.M1 + self.M2))
else:
return vel.z-self.RV
[docs] def RV_timeseries(self,ts,recalc=False):
"""
Radial Velocity time series for star 1 at given times ts.
:param ts:
Times. If not ``Quantity``, assumed to be in days.
:type ts:
array-like or ``Quantity``
:param recalc: (optional)
If ``False``, then if called with the exact same ``ts``
as last call, it will return cached calculation.
"""
if type(ts) != Quantity:
ts *= u.day
if not recalc and hasattr(self,'RV_measurements'):
if (ts == self.ts).all():
return self._RV_measurements
else:
pass
RVs = Quantity(np.zeros((len(ts),self.N)),unit='km/s')
for i,t in enumerate(ts):
RVs[i,:] = self.dRV(t,com=True)
self._RV_measurements = RVs
self.ts = ts
return RVs
@property
def dataframe(self):
"""
Summary DataFrame of OrbitPopulation
Used to save/restore state.
"""
if not hasattr(self,'_dataframe'):
obspos = self.obspos.represent_as('cartesian')
obsx, obsy, obsz = (obspos.x,obspos.y,obspos.z)
df = pd.DataFrame({'M1':self.M1,
'M2':self.M2,
'P':self.P,
'ecc':self.ecc,
'mean_anomaly':self.M,
'obsx':obsx,
'obsy':obsy,
'obsz':obsz})
self._dataframe = df
return self._dataframe
[docs] def scatterplot(self,fig=None,figsize=(7,7),ms=0.5,
rmax=None,log=False,**kwargs):
"""
Makes a scatter plot of projected X-Y sky separation
:param fig: (optional)
Passed to :func:`plotutils.setfig`
:param figsize: (optional)
Size of figure (in).
:param ms: (optional)
Marker size
:param rmax: (optional)
Maximum projected radius to plot.
:param log: (optional)
Whether to plot with log scale.
:param **kwargs:
Additional keyword arguments passed to ``plt.plot``.
"""
setfig(fig,figsize=figsize)
plt.plot(self.position.x.value,self.position.y.value,'o',ms=ms,**kwargs)
plt.xlabel('projected separation [AU]')
plt.ylabel('projected separation [AU]')
if rmax is not None:
plt.xlim((-rmax,rmax))
plt.ylim((-rmax,rmax))
if log:
plt.xscale('log')
plt.yscale('log')
[docs] def save_hdf(self,filename,path=''):
"""
Saves all relevant data to .h5 file; so state can be restored.
"""
self.dataframe.to_hdf(filename,'{}/df'.format(path))
[docs] @classmethod
def from_df(cls, df):
"""Creates an OrbitPopulation from a DataFrame.
:param df:
:class:`pandas.DataFrame` object. Must contain the following
columns: ``['M1','M2','P','ecc','mean_anomaly','obsx','obsy','obsz']``,
i.e., as what is accessed via :attr:`OrbitPopulation.dataframe`.
:return:
:class:`OrbitPopulation`.
"""
return cls(df['M1'], df['M2'], df['P'],
ecc=df['ecc'], mean_anomaly=df['mean_anomaly'],
obsx=df['obsx'], obsy=df['obsy'], obsz=df['obsz'])
[docs] @classmethod
def load_hdf(cls, filename, path=''):
"""Loads OrbitPopulation from HDF file.
:param filename:
HDF file
:param path:
Path within HDF file store where :class:`OrbitPopulation` is saved.
"""
df = pd.read_hdf(filename,'{}/df'.format(path))
return cls.from_df(df)
class BinaryGrid(OrbitPopulation):
def __init__(self, M1, qmin=0.1, qmax=1, Pmin=0.5, Pmax=365, N=1e5, logP=True, eccfn=None):
"""A grid of companions to primary, in mass ratio and period space.
:param M1:
Primary mass [solar masses].
:type M1:
``float``
:param qmin,qmax: (optional)
Minimum and maximum mass ratios.
:param Pmin,Pmax: (optional)
Min/max periods in days.
:param N: (optional)
Total number of simulations. Default = 10^5.
:param logP: (optional)
Whether to grid in log-period. If ``False``, then linear.
:param eccfn: (optional)
Function that returns eccentricity as a function of period.
If ``None``, then eccentricity will be zero.
:type eccfn:
callable
"""
M1s = np.ones(N)*M1
M2s = (rand.random(size=N)*(qmax-qmin) + qmin)*M1s
if logP:
Ps = 10**(rand.random(size=N)*((np.log10(Pmax) - np.log10(Pmin))) + np.log10(Pmin))
else:
Ps = rand.random(size=N)*(Pmax - Pmin) + Pmin
if eccfn is None:
eccs = 0
else:
eccs = eccfn(Ps)
self.eccfn = eccfn
OrbitPopulation.__init__(self,M1s,M2s,Ps,ecc=eccs)
def RV_RMSgrid(self,ts,res=20,mres=None,Pres=None,
conf=0.95,measured_rms=None,drv=0,
plot=True,fig=None,contour=True,sigma=1):
"""Writes a grid of RV RMS values, assuming observations at given times.
Caveat Emptor: Written a long time ago, and
hasn't really been tested.
:param ts:
Times of observations
:param res, mres, Pres: (optional)
Resolution of grids. ``res`` relates to both mass and period;
otherwise ``mres`` and ``Pres`` can be set separately.
:param conf: (optional)
Confidence at which to exclude regions. Used if ``measured_rms``
is ``None``.
:param measured_rms: (optional)
Measured RV RMS, if applicable [not sure exactly how this is used]
:param drv: (optional)
Uncertainties in RV to simulate.
:param plot: (optional)
Whether to plot result.
:param fig: (optional)
Passed to :func:`plotutils.setfig`.
:param contour: (optional)
Whether to plot contours.
:param sigma: (optional)
Level at which to exclude, based on ``measured_rms``.
"""
RVs = self.RV_timeseries(ts)
RVs += rand.normal(size=np.size(RVs)).reshape(RVs.shape)*drv
rms = RVs.std(axis=0)
if mres is None:
mres = res
if Pres is None:
Pres = res
mbins = np.linspace(self.M2.min(),self.M2.max(),mres+1)
Pbins = np.logspace(np.log10(self.P.min()),np.log10(self.P.max()),Pres+1)
logPbins = np.log10(Pbins)
mbin_centers = (mbins[:-1] + mbins[1:])/2.
logPbin_centers = (logPbins[:-1] + logPbins[1:])/2.
minds = np.digitize(self.M2,mbins)
Pinds = np.digitize(self.P,Pbins)
pctiles = np.zeros((mres,Pres))
ns = np.zeros((mres,Pres))
for i in np.arange(mres):
for j in np.arange(Pres):
w = np.where((minds==i+1) & (Pinds==j+1))
these = rms[w]
n = size(these)
ns[i,j] = n
if measured_rms is not None:
pctiles[i,j] = (these > sigma*measured_rms).sum()/float(n)
else:
inds = np.argsort(these)
pctiles[i,j] = these[inds][int((1-conf)*n)]
Ms,logPs = np.meshgrid(mbin_centers,logPbin_centers)
if plot:
setfig(fig)
if contour:
mbin_centers = (mbins[:-1] + mbins[1:])/2.
logPbins = np.log10(Pbins)
logPbin_centers = (logPbins[:-1] + logPbins[1:])/2.
if measured_rms is not None:
levels = [0.68,0.95,0.99]
else:
levels = np.arange(0,20,2)
c = plt.contour(logPbin_centers,mbin_centers,pctiles,levels=levels,colors='k')
plt.clabel(c, fontsize=10, inline=1)
else:
extent = [np.log10(self.P.min()),np.log10(self.P.max()),self.M2.min(),self.M2.max()]
im = plt.imshow(pctiles,cmap='Greys',extent=extent,aspect='auto')
fig = plt.gcf()
ax = plt.gca()
if measured_rms is None:
cbarticks = np.arange(0,21,2)
else:
cbarticks = np.arange(0,1.01,0.1)
cbar = fig.colorbar(im, ticks=cbarticks)
plt.xlabel('Log P')
plt.ylabel('M2')
return mbins,Pbins,pctiles,ns