import numpy as np
import pandas as pd
from tardis.energy_input.energy_source import (
positronium_continuum,
)
from tardis.energy_input.GXPacket import (
GXPacketCollection,
)
from tardis.energy_input.samplers import sample_energy
from tardis.energy_input.util import (
H_CGS_KEV,
doppler_factor_3d,
get_index,
get_random_unit_vector,
)
from tardis.transport.montecarlo.packet_source import BasePacketSource
[docs]
class RadioactivePacketSource(BasePacketSource):
def __init__(
self,
packet_energy,
gamma_ray_lines,
positronium_fraction,
inner_velocities,
outer_velocities,
inv_volume_time,
times,
energy_df_rows,
effective_times,
taus,
parents,
average_positron_energies,
average_power_per_mass,
**kwargs,
):
self.packet_energy = packet_energy
self.gamma_ray_lines = gamma_ray_lines
self.positronium_fraction = positronium_fraction
self.inner_velocities = inner_velocities
self.outer_velocities = outer_velocities
self.inv_volume_time = inv_volume_time
self.times = times
self.energy_df_rows = energy_df_rows
self.effective_times = effective_times
self.taus = taus
self.parents = parents
self.average_positron_energies = average_positron_energies
self.average_power_per_mass = average_power_per_mass
self.energy_plot_positron_rows = np.empty(0)
super().__init__(**kwargs)
[docs]
def create_packet_mus(self, no_of_packets, *args, **kwargs):
return super().create_packet_mus(no_of_packets, *args, **kwargs)
[docs]
def create_packet_radii(
self, no_of_packets, inner_velocity, outer_velocity
):
"""Initialize the random radii of packets in a shell
Parameters
----------
packet_count : int
Number of packets in the shell
inner_velocity : float
Inner velocity of the shell
outer_velocity : float
Outer velocity of the shell
Returns
-------
array
Array of length packet_count of random locations in the shell
"""
z = np.random.random(no_of_packets)
initial_radii = (
z * inner_velocity**3.0 + (1.0 - z) * outer_velocity**3.0
) ** (1.0 / 3.0)
return initial_radii
[docs]
def create_packet_nus(
self,
no_of_packets,
energy,
intensity,
positronium_fraction,
positronium_energy,
positronium_intensity,
):
"""Create an array of packet frequency-energies (i.e. E = h * nu)
Parameters
----------
no_of_packets : int
Number of packets to produce frequency-energies for
energy : One-dimensional Numpy Array, dtype float
Array of frequency-energies to sample
intensity : One-dimensional Numpy Array, dtype float
Array of intensities to sample
positronium_fraction : float
The fraction of positrons that form positronium
positronium_energy : array
Array of positronium frequency-energies to sample
positronium_intensity : array
Array of positronium intensities to sample
Returns
-------
array
Array of sampled frequency-energies
array
Positron creation mask
"""
nu_energies = np.zeros(no_of_packets)
positrons = np.zeros(no_of_packets)
zs = np.random.random(no_of_packets)
for i in range(no_of_packets):
nu_energies[i] = sample_energy(energy, intensity)
# positron
if nu_energies[i] == 511:
# positronium formation 25% of the time if fraction is 1
if zs[i] < positronium_fraction and np.random.random() < 0.25:
nu_energies[i] = sample_energy(
positronium_energy, positronium_intensity
)
positrons[i] = 1
return nu_energies, positrons
[docs]
def create_packet_directions(self, no_of_packets):
"""Create an array of random directions
Parameters
----------
no_of_packets : int
Number of packets to produce directions for
Returns
-------
array
Array of direction vectors
"""
directions = np.zeros((3, no_of_packets))
for i in range(no_of_packets):
directions[:, i] = get_random_unit_vector()
return directions
[docs]
def create_packet_energies(self, no_of_packets, energy):
"""Create the uniform packet energy for a number of packets
Parameters
----------
no_of_packets : int
Number of packets
energy : float
The packet energy
Returns
-------
array
Array of packet energies
"""
return np.ones(no_of_packets) * energy
[docs]
def calculate_energy_factors(self, no_of_packets, start_time, decay_times):
"""Calculates the factors that adjust the energy of packets emitted
before the first time step and moves those packets to the earliest
possible time
Parameters
----------
no_of_packets : int
Number of packets
start_time : float
First time step
decay_times : array
Packet decay times
Returns
-------
array
Energy factors
array
Adjusted decay times
"""
energy_factors = np.ones(no_of_packets)
for i in range(no_of_packets):
if decay_times[i] < start_time:
energy_factors[i] = decay_times[i] / start_time
decay_times[i] = start_time
return energy_factors, decay_times
[docs]
def create_packets(self, decays_per_isotope, *args, **kwargs):
"""Initialize a collection of GXPacket objects for the simulation
to operate on.
Parameters
----------
decays_per_isotope : array int64
Number of decays per simulation shell per isotope
Returns
-------
list
List of GXPacket objects
array
Array of main output dataframe rows
array
Array of plotting output dataframe rows
array
Array of positron output dataframe rows
"""
number_of_packets = decays_per_isotope.sum().sum()
decays_per_shell = decays_per_isotope.sum().values
locations = np.zeros((3, number_of_packets))
directions = np.zeros((3, number_of_packets))
packet_energies_rf = np.zeros(number_of_packets)
packet_energies_cmf = np.zeros(number_of_packets)
nus_rf = np.zeros(number_of_packets)
nus_cmf = np.zeros(number_of_packets)
shells = np.zeros(number_of_packets)
times = np.zeros(number_of_packets)
# set packets to IN_PROCESS status
statuses = np.ones(number_of_packets, dtype=np.int64) * 3
positronium_energy, positronium_intensity = positronium_continuum()
self.energy_plot_positron_rows = np.zeros((number_of_packets, 4))
packet_index = 0
# go through each shell
for shell_number, pkts in enumerate(decays_per_shell):
isotope_packet_count_df = decays_per_isotope.T.iloc[shell_number]
for isotope_name, isotope_packet_count in zip(
self.gamma_ray_lines.keys(), isotope_packet_count_df.values
):
isotope_energy = self.gamma_ray_lines[isotope_name][0, :]
isotope_intensity = self.gamma_ray_lines[isotope_name][1, :]
isotope_positron_fraction = self.calculate_positron_fraction(
self.average_positron_energies[isotope_name],
isotope_energy,
isotope_intensity,
)
tau_start = self.taus[isotope_name]
if isotope_name in self.parents:
tau_end = self.taus[self.parents[isotope_name]]
else:
tau_end = 0
# sample radii at time = 0
initial_radii = self.create_packet_radii(
isotope_packet_count,
self.inner_velocities[shell_number],
self.outer_velocities[shell_number],
)
# sample directions (valid at all times)
initial_directions = self.create_packet_directions(
isotope_packet_count
)
# packet decay time
initial_times = self.create_packet_times_uniform_energy(
isotope_packet_count,
tau_start,
tau_end,
decay_time_min=0,
decay_time_max=self.times[-1],
)
# get the time step index of the packets
initial_time_indexes = np.array(
[
get_index(decay_time, self.times)
for decay_time in initial_times
]
)
# get the time of the middle of the step for each packet
packet_effective_times = np.array(
[self.effective_times[i] for i in initial_time_indexes]
)
# scale radius by packet decay time. This could be replaced with
# Geometry object calculations. Note that this also adds a random
# unit vector multiplication for 3D. May not be needed.
initial_locations = (
initial_radii
* packet_effective_times
* self.create_packet_directions(isotope_packet_count)
)
# get the packet shell index
initial_shells = np.ones(isotope_packet_count) * shell_number
# the individual gamma-ray energies that make up a packet
# co-moving frame, including positronium formation
initial_nu_energies_cmf, positron_mask = self.create_packet_nus(
isotope_packet_count,
isotope_energy,
isotope_intensity,
self.positronium_fraction,
positronium_energy,
positronium_intensity,
)
# equivalent frequencies
initial_nus_cmf = initial_nu_energies_cmf / H_CGS_KEV
# compute scaling factor for packets emitted before start time
# and move packets to start at that time
# probably not necessary- we have rejection sampling in the
# create_packet_times_uniform_energy method
energy_factors, initial_times = self.calculate_energy_factors(
isotope_packet_count, self.times[0], initial_times
)
# the CMF energy of a packet scaled by the "early energy factor"
initial_packet_energies_cmf = (
self.create_packet_energies(
isotope_packet_count, self.packet_energy
)
* energy_factors
)
# rest frame gamma-ray energy and frequency
# this probably works fine without the loop
initial_packet_energies_rf = np.zeros(isotope_packet_count)
initial_nus_rf = np.zeros(isotope_packet_count)
for i in range(isotope_packet_count):
doppler_factor = doppler_factor_3d(
initial_directions[:, i],
initial_locations[:, i],
initial_times[i],
)
initial_packet_energies_rf[i] = (
initial_packet_energies_cmf[i] / doppler_factor
)
initial_nus_rf[i] = initial_nus_cmf[i] / doppler_factor
self.energy_plot_positron_rows[i] = np.array(
[
packet_index,
isotope_positron_fraction * self.packet_energy,
# * inv_volume_time[packet.shell, decay_time_index],
initial_radii[i],
initial_times[i],
]
)
packet_index += 1
# deposit positron energy
for time in initial_time_indexes:
self.energy_df_rows[shell_number, time] += (
isotope_positron_fraction * self.packet_energy
)
# collect packet properties
locations[
:, packet_index - isotope_packet_count : packet_index
] = initial_locations
directions[
:, packet_index - isotope_packet_count : packet_index
] = initial_directions
packet_energies_rf[
packet_index - isotope_packet_count : packet_index
] = initial_packet_energies_rf
packet_energies_cmf[
packet_index - isotope_packet_count : packet_index
] = initial_packet_energies_cmf
nus_rf[
packet_index - isotope_packet_count : packet_index
] = initial_nus_rf
nus_cmf[
packet_index - isotope_packet_count : packet_index
] = initial_nus_cmf
shells[
packet_index - isotope_packet_count : packet_index
] = initial_shells
times[
packet_index - isotope_packet_count : packet_index
] = initial_times
return GXPacketCollection(
locations,
directions,
packet_energies_rf,
packet_energies_cmf,
nus_rf,
nus_cmf,
statuses,
shells,
times,
)
[docs]
def calculate_positron_fraction(
self, positron_energy, isotope_energy, isotope_intensity
):
"""Calculate the fraction of energy that an isotope
releases as positron kinetic energy
Parameters
----------
positron_energy : float
Average kinetic energy of positrons from decay
isotope_energy : numpy array
Photon energies released by the isotope
isotope_intensity : numpy array
Intensity of photon energy release
Returns
-------
float
Fraction of energy released as positron kinetic energy
"""
return positron_energy / np.sum(isotope_energy * isotope_intensity)
[docs]
class GammaRayPacketSource(BasePacketSource):
def __init__(
self,
packet_energy,
gamma_ray_lines,
positronium_fraction,
inner_velocities,
outer_velocities,
inv_volume_time,
times,
energy_df_rows,
effective_times,
taus,
parents,
average_positron_energies,
average_power_per_mass,
**kwargs,
):
self.packet_energy = packet_energy
self.gamma_ray_lines = gamma_ray_lines
self.positronium_fraction = positronium_fraction
self.inner_velocities = inner_velocities
self.outer_velocities = outer_velocities
self.inv_volume_time = inv_volume_time
self.times = times
self.energy_df_rows = energy_df_rows
self.effective_times = effective_times
self.taus = taus
self.parents = parents
self.average_positron_energies = average_positron_energies
self.average_power_per_mass = average_power_per_mass
self.energy_plot_positron_rows = np.empty(0)
super().__init__(**kwargs)
[docs]
def create_packet_mus(self, no_of_packets, *args, **kwargs):
return super().create_packet_mus(no_of_packets, *args, **kwargs)
[docs]
def create_packet_radii(self, sampled_packets_df):
"""Initialize the random radii of packets in a shell
Parameters
----------
packet_count : int
Number of packets in the shell
sampled_packets_df : pd.DataFrame
Dataframe where each row is a packet
Returns
-------
array
Array of length packet_count of random locations in the shell
"""
z = np.random.random(len(sampled_packets_df))
initial_radii = (
z * sampled_packets_df["inner_velocity"] ** 3.0
+ (1.0 - z) * sampled_packets_df["outer_velocity"] ** 3.0
) ** (1.0 / 3.0)
return initial_radii
[docs]
def create_packet_nus(
self,
no_of_packets,
packets,
positronium_fraction,
positronium_energy,
positronium_intensity,
):
"""Create an array of packet frequency-energies (i.e. E = h * nu)
Parameters
----------
no_of_packets : int
Number of packets to produce frequency-energies for
packets : pd.DataFrame
DataFrame of packets
positronium_fraction : float
The fraction of positrons that form positronium
positronium_energy : array
Array of positronium frequency-energies to sample
positronium_intensity : array
Array of positronium intensities to sample
Returns
-------
array
Array of sampled frequency-energies
"""
energy_array = np.zeros(no_of_packets)
zs = np.random.random(no_of_packets)
for i in range(no_of_packets):
# positron
if packets.iloc[i]["decay_type"] == "bp":
# positronium formation 75% of the time if fraction is 1
if zs[i] < positronium_fraction and np.random.random() < 0.75:
energy_array[i] = sample_energy(
positronium_energy, positronium_intensity
)
else:
energy_array[i] = 511
else:
energy_array[i] = packets.iloc[i]["radiation_energy_kev"]
return energy_array
[docs]
def create_packet_directions(self, no_of_packets):
"""Create an array of random directions
Parameters
----------
no_of_packets : int
Number of packets to produce directions for
Returns
-------
array
Array of direction vectors
"""
directions = np.zeros((3, no_of_packets))
for i in range(no_of_packets):
directions[:, i] = get_random_unit_vector()
return directions
[docs]
def create_packet_energies(self, no_of_packets, energy):
"""Create the uniform packet energy for a number of packets
Parameters
----------
no_of_packets : int
Number of packets
energy : float
The packet energy
Returns
-------
array
Array of packet energies
"""
return np.ones(no_of_packets) * energy
[docs]
def create_packets(
self, decays_per_isotope, number_of_packets, *args, **kwargs
):
"""Initialize a collection of GXPacket objects for the simulation
to operate on.
Parameters
----------
decays_per_isotope : array int64
Probability of decays per simulation shell per isotope per time step
number_of_packets : int
Number of packets to create
Returns
-------
GXPacketCollection
"""
# initialize arrays for most packet properties
locations = np.zeros((3, number_of_packets))
directions = np.zeros((3, number_of_packets))
packet_energies_rf = np.zeros(number_of_packets)
packet_energies_cmf = np.zeros(number_of_packets)
nus_rf = np.zeros(number_of_packets)
nus_cmf = np.zeros(number_of_packets)
times = np.zeros(number_of_packets)
# set packets to IN_PROCESS status
statuses = np.ones(number_of_packets, dtype=np.int64) * 3
self.energy_plot_positron_rows = np.zeros((number_of_packets, 4))
# compute positronium continuum
positronium_energy, positronium_intensity = positronium_continuum()
# sample packets from dataframe, returning a dataframe where each row is
# a sampled packet
sampled_packets_df = decays_per_isotope.sample(
n=number_of_packets,
weights="decay_energy_erg",
replace=True,
random_state=np.random.RandomState(self.base_seed),
)
# get unique isotopes that have produced packets
isotopes = pd.unique(sampled_packets_df.index.get_level_values(2))
# compute the positron fraction for unique isotopes
isotope_positron_fraction = self.calculate_positron_fraction(isotopes)
# get the packet shell index
shells = sampled_packets_df.index.get_level_values(1)
# get the inner and outer velocity boundaries for each packet to compute
# the initial radii
sampled_packets_df["inner_velocity"] = self.inner_velocities[shells]
sampled_packets_df["outer_velocity"] = self.outer_velocities[shells]
# sample radii at time = 0
initial_radii = self.create_packet_radii(sampled_packets_df)
# get the time step index of the packets
initial_time_indexes = sampled_packets_df.index.get_level_values(0)
# get the time of the middle of the step for each packet
packet_effective_times = np.array(
[self.effective_times[i] for i in initial_time_indexes]
)
# packet decay time
times = self.create_packet_times_uniform_energy(
number_of_packets,
sampled_packets_df.index.get_level_values(2),
packet_effective_times,
)
# scale radius by packet decay time. This could be replaced with
# Geometry object calculations. Note that this also adds a random
# unit vector multiplication for 3D. May not be needed.
locations = (
initial_radii
* packet_effective_times
* self.create_packet_directions(number_of_packets)
)
# sample directions (valid at all times), non-relativistic
directions = self.create_packet_directions(number_of_packets)
# the individual gamma-ray energy that makes up a packet
# co-moving frame, including positronium formation
nu_energies_cmf = self.create_packet_nus(
number_of_packets,
sampled_packets_df,
self.positronium_fraction,
positronium_energy,
positronium_intensity,
)
# equivalent frequencies
nus_cmf = nu_energies_cmf / H_CGS_KEV
# per packet co-moving frame total energy
packet_energies_cmf = self.create_packet_energies(
number_of_packets, self.packet_energy
)
# rest frame gamma-ray energy and frequency
# this probably works fine without the loop
# non-relativistic
packet_energies_rf = np.zeros(number_of_packets)
nus_rf = np.zeros(number_of_packets)
for i in range(number_of_packets):
doppler_factor = doppler_factor_3d(
directions[:, i],
locations[:, i],
times[i],
)
packet_energies_rf[i] = packet_energies_cmf[i] / doppler_factor
nus_rf[i] = nus_cmf[i] / doppler_factor
# deposit positron energy in both output arrays
# this is an average across all packets that are created
# it could be changed to be only for packets that are from positrons
self.energy_plot_positron_rows[i] = np.array(
[
i,
isotope_positron_fraction[sampled_packets_df["isotopes"][i]]
* packet_energies_cmf[i],
# this needs to be sqrt(sum of squares) to get radius
np.linalg.norm(locations[i]),
times[i],
]
)
# this is an average across all packets that are created
# it could be changed to be only for packets that are from positrons
self.energy_df_rows[shells[i], times[i]] += (
isotope_positron_fraction[sampled_packets_df["isotopes"][i]]
* packet_energies_cmf[i]
)
return GXPacketCollection(
locations,
directions,
packet_energies_rf,
packet_energies_cmf,
nus_rf,
nus_cmf,
statuses,
shells,
times,
)
[docs]
def calculate_positron_fraction(self, isotopes):
"""Calculate the fraction of energy that an isotope
releases as positron kinetic energy
Parameters
----------
isotopes : array
Array of isotope names as strings
Returns
-------
dict
Fraction of energy released as positron kinetic energy per isotope
"""
positron_fraction = {}
for isotope in isotopes:
isotope_energy = self.gamma_ray_lines[isotope][0, :]
isotope_intensity = self.gamma_ray_lines[isotope][1, :]
positron_fraction[isotope] = self.average_positron_energies[
isotope
] / np.sum(isotope_energy * isotope_intensity)
return positron_fraction