123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import itertools
- import pathlib
- import pandas as pd
- import numpy as np
- import scipy
- import spikeinterface.full as si
- import json
- import csv
- from scipy.io import loadmat
- import neo
- from bep032tools.generator.BEP032Generator import BEP032Data
- from bep032tools.generator.utils import save_json, save_tsv
- # b = BEP032Data()
- # b.generate_metadata_file_channels()
- class BIDSGenerator(BEP032Data):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.probe_name, self.probe_sources = self.load_probe_source_data()
- def generate_metadata_file_channels(self, output):
- recording_folder = self.custom_metadata_sources['source_data_folder']
- neo_reader = neo.get_io(recording_folder)
- # recording = si.NixRecordingExtractor(recording_folder, streamid)
- neo_streams = neo_reader.header['signal_streams']
- neo_channels = neo_reader.header['signal_channels']
- df = pd.DataFrame.from_records(neo_channels)
- df.rename(columns={'name': 'channel_id', 'id':'channel_name', 'sampling_rate':'sampling_frequency', 'units':'unit'}, inplace=True)
- df['type'] = 'EXT'
- # ensure all channels have a corresponding contact
- channel_contacts = df['channel_id'].str.extract(r'(\d+)').astype(int)
- assert all(np.isin(channel_contacts, self.probe_sources['chanMap0ind']))
- df.set_index('channel_id', inplace=True)
- save_tsv(df, output)
- def generate_metadata_file_probes(self, output): # get_probes_files(path):
- # only a single neuropixel probe used in this experiment
- df = pd.DataFrame(columns=['probe_id', 'type'], data=[[f'probe-{self.probe_name}', 'Neuropixel']])
- df.set_index('probe_id', inplace=True)
- save_tsv(df, output)
- def generate_metadata_file_contacts(self, output): # get_contacts_files(probes_file):
- df = self.probe_sources.copy()
- df.rename(columns={'chanMap0ind':'contact_id','shankInd':'shank_id','chanMap':'1-indexed-contact_id', 'xcoords':'x','ycoords':'y'}, inplace=True)
- df.set_index('contact_id', inplace=True)
- save_tsv(df, output)
- def generate_metadata_file_dataset_description(self, output):
- mdict = {'author': ['Alice A', ' Bob B']}
- # Using a JSON string
- save_json(mdict, output)
- def generate_metadata_file_participants(self, output):
- df = pd.DataFrame(columns=['subject_id'], data=['sub-' + self.sub_id])
- df.set_index('subject_id', inplace=True)
- save_tsv(df, output)
- def generate_metadata_file_sessions(self, output):
- df = pd.DataFrame(columns=['session_id'], data=['ses-' + self.ses_id])
- df.set_index('session_id', inplace=True)
- save_tsv(df, output)
- def generate_metadata_file_ephys(self, output):
- mdict = {'PowerLineFrequency':60}
- save_json(mdict, output)
- def load_probe_source_data(self):
- sources_folder = self.custom_metadata_sources['source_data_folder'].parents[1]
- # Import .mat dataset
- mat_files = list(sources_folder.glob('*.mat'))
- assert len(mat_files) == 1
- mat_file = mat_files[0]
- mat = scipy.io.loadmat(mat_file)
- df = pd.DataFrame()
- for key, values in mat.items():
- if key.startswith('__') or key == 'name':
- continue
- else:
- df[key] = values.flatten()
- if 'name' in mat:
- probe_name = mat['name'][0]
- else:
- probe_name = None
- return probe_name, df
- if __name__ == '__main__':
- # looping over source data
- for sub_path in pathlib.Path('.').glob('../sourcedata/sub-*'):
- sub_id = sub_path.name.split('sub-')[-1]
- for ses_path in sub_path.glob('ses-*'):
- ses_id = ses_path.name.split('ses-')[-1]
- # generate BIDS representation for each source recording
- gen = BIDSGenerator(sub_id, ses_id,
- custom_metadata_source={'source_data_folder': ses_path})
- gen.basedir = pathlib.Path('../rawdata')
- gen.register_data_sources(ses_path)
- gen.generate_directory_structure()
- gen.organize_data_files(mode='move', autoconvert='nwb')
- gen.generate_all_metadata_files()
|