Source code for sandp.utils

"""Convert root into pandas dataframe
"""
from root_numpy import root2array
import pandas as pd
import numpy as np
import os
from textwrap import dedent
from pymongo import MongoClient
from tqdm import tqdm


[docs]def load_dataframe(filename, amplifier=10): """convert ROOT file into dataframe format. :param filename: file name including the path :return data: data in pandas dataframe format """ data_1 = pd.DataFrame(root2array(filename, 'T1')) array_branches = [] scalar_branches = ['BaseLineChannel', 'BaseLineChannelSigma', 'S2sPMT'] # these two are not scalar # columns that we want two peaks from them column_two = ['S2sPeak', 'S1sPeak', 'S1sTot', 'S2sTot', 'S2sPosX', 'S2sPosY'] for name in data_1.columns.values: if name in scalar_branches: continue if hasattr(data_1[name][0], '__len__'): if name in column_two: continue array_branches.append((name, np.nan)) # TODO add length to different array else: scalar_branches.append(name) data_2 = pd.DataFrame(root2array(filename, branches=array_branches)) data = pd.concat([data_1[scalar_branches], data_2], axis=1) # Add S1 S2 info including second s1 and s2 info = { 'S1sTot': { 'array_branches': ('S1sTot', np.nan, 2), 'column_name': ['s1', 'largest_other_s1'] }, 'S2sTot': { 'array_branches': ('S2sTot', np.nan, 2), 'column_name': ['s2', 'largest_other_s2'] }, 'S1sPeak': { 'array_branches': ('S1sPeak', np.nan, 2), 'column_name': ['s1_time', 'alt_s1_time'] }, 'S2sPeak': { 'array_branches': ('S2sPeak', np.nan, 2), 'column_name': ['s2_time', 'alt_s2_time'] }, 'S2sPosX': { 'array_branches': ('S2sPosX', np.nan, 2), 'column_name': ['x', 'alt_s2_x'] }, 'S2sPosY': { 'array_branches': ('S2sPosY', np.nan, 2), 'column_name': ['y', 'alt_s2_y'] } } for name, content in info.items(): data_tmp = pd.DataFrame(data=root2array(filename, branches=content['array_branches']), columns=content['column_name']) data = pd.concat([data, data_tmp], axis=1) # convert time from sample to us sample_to_us = 4 / 1e3 time_columns = ['s1_time', 'alt_s1_time', 's2_time', 'alt_s2_time', 'S1sWidth', 'S1sLowWidth', 'S2sLowWidth', 'S2sWidth', 'S1sRiseTime', 'S1sDropTime', 'S2sRiseTime', 'S2sDropTime'] for column in time_columns: data.loc[:, column] *= sample_to_us # divide signal by 10 # TODO: just for temporal purpose. put this in processor in the future and remove these lines if amplifier: signal_columns = ['s1', 'largest_other_s1', 's2', 'largest_other_s2'] for column in signal_columns: data.loc[:, column] /= amplifier # add alias data['drift_time'] = data['s2_time'] - data['s1_time'] # us data['r'] = np.sqrt(data['x'] ** 2 + data['y'] ** 2) return data
[docs]def load_path(path, amplifier=10): """Load all root files from specific path """ assert isinstance(path, (str, list)), "path should be either string or list type" if isinstance(path, str): files = os.listdir(path) full_file_path_s = [os.path.join(path, file) for file in files if '.root' in file] data = load_dataframe(full_file_path_s, amplifier) else: data = pd.DataFrame() for path_ in path: data_tmp = load_path(path_) data = pd.concat([data, data_tmp], ignore_index=True) return data
[docs]def get_coll(): """get data collection info from mongodb""" client = MongoClient('mongodb://sandix:%s@132.239.186.12:27017' % os.environ['MONGO_PASSWORD']) db = client['run'] coll = db['data'] return coll
[docs]def get_datasets(): """get run info as pd dataframe""" coll = get_coll() doc_s = list(coll.find()) datasets = pd.DataFrame(doc_s) return datasets.drop(['_id', 'processed_data_location'], axis=1)
[docs]def get_processor_version_name(processor): """get processor name from processor name. You may find it stupid, but '.' is not supported in BSON for mongodb.. Plus we would like to send a reminder if name goes wrong... limited choices!""" if processor == 'sandix_v1.1': name = 'sandix_v1p1' elif processor == 'sandp_test': name = 'sandp_test' elif processor == 'sandp_test_nrd': name = 'sandp_test_nrd' else: raise ValueError("processor is either 'sandix_v1.1', 'sandp_test' or 'sandp_test_nrd', wanna try again?") return name
[docs]def run_number_to_file_s(run_numbers, processor): """find file path(s) and amplifier condition based on run numbers""" if not isinstance(run_numbers, list): run_numbers = run_numbers.tolist() coll = get_coll() doc_s = list(coll.find({'run_number': {'$in': run_numbers}})) version_name = get_processor_version_name(processor) run_info = doc_s_to_run_info(doc_s, version_name) return run_info
[docs]def doc_s_to_run_info(doc_s, version_name): """get info of run (file location, amplifier_on, run_number) based on doc after selection and processor version name""" run_info = [] for doc in doc_s: if not os.path.exists(doc['processed_data_location'][version_name]): print('run: %d is not found, will be skipped' % doc['run_number']) continue run_info.append({'file_location': doc['processed_data_location'][version_name], 'amplifier_on': doc['amplifier_on'], 'run_number': doc['run_number']}) return run_info
[docs]def get_file_from_path(path): """get absolute path for files under certain path(s)""" if isinstance(path, str): if not os.path.exists(path): return [] files = os.listdir(path) full_path_s = [os.path.join(path, file) for file in files if '.root' in file] else: assert hasattr(path, '__len__'), "if 'path' is not a string, then it should be an array or list!" full_path_s_tmp = [] for path_ in path: full_path_s_tmp.append(get_file_from_path(path_)) full_path_s = [element for sub_path in full_path_s_tmp for element in sub_path] return full_path_s
[docs]def folders_to_path(folder, processor): """find absolute path of each folder based on name of folder and processor version""" version_name = get_processor_version_name(processor) if version_name == 'sandix_v1p1': # TODO: put this into ini base_path = '/home/nilab/10T_Two/Processed/Run21/sandp_v1.1/Co57' elif version_name == 'sandp_test': base_path = '/home/nilab/10T_Two/Processed/Run21/sandp_test/SE_update_s1_width_10/Co57/' else: base_path = '/home/nilab/10T_Two/Processed/Run21/sandp_test/SE_update_s1_width_10_nrd/Co57/' if isinstance(folder, str): path = os.path.join(base_path, folder) else: assert hasattr(folder, '__len__'), "if 'folder' is not a string, then it should be an array or list!" path = [folders_to_path(folder_, processor) for folder_ in folder] return path
[docs]def folders_to_file_s(folder, processor): """find files and amplifier conditions based on folder name(s). Return dictionary with keys of file_location and amplifier_on""" path = folders_to_path(folder, processor) full_file_path = get_file_from_path(path) coll = get_coll() version_name = get_processor_version_name(processor) doc_s = list(coll.find({'processed_data_location.%s' %version_name: {'$in': full_file_path}})) run_info = doc_s_to_run_info(doc_s, version_name) return run_info
[docs]def judge_str(input): if isinstance(input, str): return True else: if hasattr(input, '__len__'): if isinstance(input[0], str): return True else: return False
[docs]def load(input, processor='sandix_v1.1'): """load data into pd dataframe by run numbers, or folder name""" # hardcode for now is_string = judge_str(input) if is_string: run_info = folders_to_file_s(input, processor) else: run_info = run_number_to_file_s(input, processor) data = pd.DataFrame() for run in tqdm(run_info, desc='load data'): if run['amplifier_on']: amplifier = 10 else: amplifier = 1 data_tmp = load_dataframe(run['file_location'], amplifier=amplifier) data_tmp['run_number'] = run['run_number'] data = pd.concat([data, data_tmp], ignore_index=True) return data
[docs]def code_hider(): """Stolen from hax Make a button in the jupyter notebook to hide all code """ # Stolen from stackoverflow... forget which question # I would really like these buttons for every individual cell.. but I don't know how from IPython.display import HTML # Please keep here, don't want hax to depend on ipython! return HTML(dedent(''' <script> code_show=true function code_toggle() { if (code_show){ $('div.input').hide(); } else { $('div.input').show(); } code_show = !code_show } $( document ).ready(code_toggle); </script> <form action="javascript:code_toggle()"><input type="submit" value="Show/hide all code in this notebook"></form>'''))