Source code for sandp.Alls2
"""
For single electron analysis
"""
import numpy as np
import pandas as pd
from root_numpy import root2array
import os
import matplotlib.pyplot as plt
from multihist import Histdd, Hist1d
from .utils import run_number_to_file_s, folders_to_file_s, judge_str
from tqdm import tqdm
[docs]def get_all_vector(ndarr):
all_arr = []
for arr in ndarr:
[all_arr.append(e) for e in arr]
return np.array(all_arr)
[docs]def get_all_vector_as_scalar(ndarr):
"""
only select the first entry of a vector if it's non-empty
"""
all_arr = []
for arr in ndarr:
if len(arr) > 0:
[all_arr.append(arr[0]) for _ in arr]
return np.array(all_arr)
[docs]def get_all_scalar(arr1, ndarr2):
"""
vectorize a scalar based on another vector
"""
all_arr = []
for arr1_, arr in zip(arr1, ndarr2):
if len(arr) > 0:
[all_arr.append(arr1_) for _ in arr]
return np.array(all_arr)
[docs]def get_all_vector_by_other_vector(ndarr1, ndarr2):
"""
get vector as scalar based on another vector, and assign nan if that vector is empty
"""
all_arr = []
for arr1, arr2 in zip(ndarr1, ndarr2):
if len(arr2) > 0:
if len(arr1) == 0:
arr1 = [np.nan]
[all_arr.append(arr1[0]) for _ in arr2]
return np.array(all_arr)
[docs]def get_max(ndarr):
all_arr = []
for arr in ndarr:
if len(arr) > 0:
all_arr.append(max(arr))
return np.array(all_arr)
[docs]def to_new_df(data, amplifier=True):
"""
Make dataframe for all S2s. It provides:
- run_number: number of the run
- event_id: event number in the run
- event_time: when event happened in unix time in seconds
- x: x position of event in arbitrary unit (a.u.)
- y: y position of event in a.u.
- s2: s2 size, PE
- main_s2: size of main s2 for the event where the s2 is, PE
- s2_width_50: s2 width of 50 percent area in us
- s2_width_90: s2 width of 90 percent area in us
- s2_rise_time: s2 rise time in us
- s2_drop_time: s2 drop time in us
- s1: s1 size in PE
- s1_time: center time of main s1 in us
- s2_time: center time of s2 in us
- main_s2_time: center time of main s2 in us
- s2_delay_time: how long s2 is delayed after main s2, us
"""
event_id = get_all_scalar(data.EventID, data.S2sPeak)
event_time = get_all_scalar(data.UnixTime, data.S2sPeak) # s
x = get_all_vector(data.S2sPosX)
y = get_all_vector(data.S2sPosY)
if not amplifier:
amp = 1
else:
amp = 10
sample_to_mu = 1 / 250
main_s2 = get_all_vector_as_scalar(data.S2sTot) / amp # get main s2 in the event which s2 is in
s2 = get_all_vector(data.S2sTot) / amp # amplifier
s1 = get_all_vector_by_other_vector(data.S1sTot, data.S2sPeak) / amp # amplifier
s2_width_50 = get_all_vector(data.S2sWidth) * sample_to_mu # us
s2_width_90 = get_all_vector(data.S2sLowWidth) * sample_to_mu # us
s2_rise_time = get_all_vector(data.S2sRiseTime) * sample_to_mu # us
s2_drop_time = get_all_vector(data.S2sDropTime) * sample_to_mu # us
s1_time = get_all_vector_by_other_vector(data.S1sPeak, data.S2sPeak) * sample_to_mu # us
s2_time = get_all_vector(data.S2sPeak) * sample_to_mu # us
main_s2_time = get_all_vector_as_scalar(data.S2sPeak) * sample_to_mu # us
s2_delay_time = s2_time - main_s2_time # us
df = pd.DataFrame({'event_id': event_id,
'event_time': event_time,
'x': x,
'y': y,
's2': s2,
'main_s2': main_s2,
's2_width_50': s2_width_50,
's2_width_90': s2_width_90,
's2_rise_time': s2_rise_time,
's2_drop_time': s2_drop_time,
's1': s1,
's1_time': s1_time,
's2_time': s2_time,
'main_s2_time': main_s2_time,
's2_delay_time': s2_delay_time})
return df
[docs]def load_data(file):
data = pd.DataFrame(root2array(file, 'T1'))
return data
[docs]def load(input, processor='sandp_test'):
"""load data into all s2 format. Input can be either folder name or run number"""
is_string = judge_str(input)
if is_string:
run_info = folders_to_file_s(input, processor)
else:
run_info = run_number_to_file_s(input, processor)
data = pd.DataFrame()
for run in tqdm(run_info, desc='load single e data'):
data_tmp = to_new_df(load_data(run['file_location']), amplifier=run['amplifier_on'])
data_tmp['run_number'] = run['run_number']
data = pd.concat([data, data_tmp], ignore_index=True)
return data