123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- # -*- coding: utf-8 -*-
- """
- Example code for loading and processing of a recording of the reach-
- to-grasp experiments conducted at the Institute de Neurosciences de la Timone
- by Thomas Brochier and Alexa Riehle from the original Blackrock files and
- odML files, using the custom loading routine `ReachGraspIO`.
- Authors: Julia Sprenger, Lyuba Zehl, Michael Denker
- Copyright (c) 2017, Institute of Neuroscience and Medicine (INM-6),
- Forschungszentrum Juelich, Germany
- All rights reserved.
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
- * Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- * Neither the names of the copyright holders nor the names of the contributors
- may be used to endorse or promote products derived from this software without
- specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
- ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
- FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
- CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
- OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- """
- import os
- import numpy as np
- import matplotlib.pyplot as plt
- import quantities as pq
- from neo import Block, Segment
- from elephant.signal_processing import butter
- from reachgraspio import reachgraspio
- from neo.utils import cut_segment_by_epoch, add_epoch, get_events
- from neo_utils import load_segment
- # =============================================================================
- # Load data
- #
- # As a first step, we partially load the data file into memory as a Neo object.
- # =============================================================================
- # Specify the path to the recording session to load, eg,
- # '/home/user/l101210-001'
- session_name = "i140703-001"
- # session_name = "l101210-001"
- session_path = os.path.join('..', 'datasets_blackrock', session_name)
- odml_dir = os.path.join('..', 'datasets_blackrock')
- # Open the session for reading
- session = reachgraspio.ReachGraspIO(session_path, odml_directory=odml_dir)
- # Read a the complete dataset in lazy mode generating all neo objects,
- # but not loading data into memory. The lazy neo structure will contain objects
- # to capture all recorded data types (time series at 1000Hz (ns2) and 30kHz (ns6)
- # scaled to units of voltage, sorted spike trains, spike waveforms and events)
- # of the recording session and return it as a Neo Block. The
- # time shift of the ns2 signal (LFP) induced by the online filter is
- # automatically corrected for by a heuristic factor stored in the metadata
- # (correct_filter_shifts=True).
- block = session.read_block(lazy=True, correct_filter_shifts=True)
- # Validate there is only a single Segment present in the block
- assert len(block.segments) == 1
- # loading data content of all data objects during the first 300 seconds
- data_segment = load_segment(block.segments[0], time_range=(None, 300*pq.s))
- # =============================================================================
- # Create offline filtered LFP
- #
- # Here, we construct one offline filtered LFP from each ns5 (monkey L) or ns6
- # (monkey N) raw recording trace. For monkey N, this filtered LFP can be
- # compared to the LFPs in the ns2 file (note that monkey L contains only
- # behavioral signals in the ns2 file). Also, we assign telling names to each
- # Neo AnalogSignal, which is used for plotting later on in this script.
- # =============================================================================
- target_channel_id = 62
- nsx_to_anasig_name = {2: 'LFP signal (online filtered)',
- 5: 'raw signal',
- 6: 'raw signal'}
- filtered_anasig = None
- raw_anasig = None
- # identify neuronal signals and provide labels for plotting
- for anasig in data_segment.analogsignals:
- # skip non-neuronal signals
- if not anasig.annotations['neural_signal']:
- continue
- # identify nsx source of signals in this AnalogSignal object
- if 'nsx' in anasig.annotations:
- nsx = anasig.annotations['nsx']
- else:
- nsx = np.unique(anasig.array_annotations['nsx'])
- assert len(nsx) == 1, 'Different nsx sources in AnalogSignal'
- nsx = nsx[0]
- if nsx == 2:
- # AnalogSignal is LFP from ns2
- anasig.name = f'LFP (online filter, ns2)'
- filtered_anasig = anasig
- elif nsx in [5, 6]:
- # AnalogSignal is raw signal from ns5 or ns6
- anasig.name = f'raw (ns{nsx})'
- raw_anasig = anasig
- # Create LFP signal by filtering raw signal if not present already
- if filtered_anasig is None:
- # Use the Elephant library to filter the signal, filter only target channel
- channel_ids = np.asarray(raw_anasig.array_annotations['channel_ids'], dtype=int)
- target_channel_index = np.where(target_channel_id == channel_ids)[0]
- raw_signal = raw_anasig[:, target_channel_index]
- f_anasig = butter(
- raw_signal,
- highpass_freq=None,
- lowpass_freq=250 * pq.Hz,
- order=4)
- if 'nsx' in anasig.annotations:
- nsx = anasig.annotations['nsx']
- else:
- nsx = anasig.array_annotations["nsx"][0]
- f_anasig.name = f'LFP (offline filtered ns{nsx})'
- # Attach all offline filtered LFPs to the segment of data
- data_segment.analogsignals.append(f_anasig)
- # =============================================================================
- # Construct analysis epochs
- #
- # In this step we extract and cut the data into time segments (termed analysis
- # epochs) that we wish to analyze. We contrast these analysis epochs to the
- # behavioral trials that are defined by the experiment as occurrence of a Trial
- # Start (TS-ON) event in the experiment. Concretely, here our analysis epochs
- # are constructed as a cutout of 25ms of data around the TS-ON event of all
- # successful behavioral trials.
- # =============================================================================
- # Get Trial Start (TS-ON) events of all successful behavioral trials
- # (corresponds to performance code 255, which is accessed for convenience and
- # better legibility in the dictionary attribute performance_codes of the
- # ReachGraspIO class).
- #
- # To this end, we filter all event objects of the loaded data to match the name
- # "TrialEvents", which is the Event object containing all Events available (see
- # documentation of ReachGraspIO). From this Event object we extract only events
- # matching "TS-ON" and the desired trial performance code (which are
- # annotations of the Event object).
- start_events = get_events(
- data_segment,
- name='TrialEvents',
- trial_event_labels='TS-ON',
- performance_in_trial=session.performance_codes['correct_trial'])
- print('got start events.')
- # Extract single Neo Event object containing all TS-ON triggers
- assert len(start_events) == 1
- start_event = start_events[0]
- # Construct analysis epochs from 10ms before the TS-ON of a successful
- # behavioral trial to 15ms after TS-ON. The name "analysis_epochs" is given to
- # the resulting Neo Epoch object. The object is not attached to the Neo
- # Segment. The parameter event2 of add_epoch() is left empty, since we are
- # cutting around a single event, as opposed to cutting between two events.
- pre = -10 * pq.ms
- post = 15 * pq.ms
- epoch = add_epoch(
- data_segment,
- event1=start_event, event2=None,
- pre=pre, post=post,
- attach_result=False,
- name='analysis_epochs',
- array_annotations=start_event.array_annotations)
- print('added epoch.')
- # Create new segments of data cut according to the analysis epochs of the
- # 'analysis_epochs' Neo Epoch object. The time axes of all segments are aligned
- # such that each segment starts at time 0 (parameter reset_times); annotations
- # describing the analysis epoch are carried over to the segments. A new Neo
- # Block named "data_cut_to_analysis_epochs" is created to capture all cut
- # analysis epochs. For execution time reason, we are only considering the
- # first 10 epochs here.
- cut_trial_block = Block(name="data_cut_to_analysis_epochs")
- cut_trial_block.segments = cut_segment_by_epoch(
- data_segment, epoch[:10], reset_time=True)
- # =============================================================================
- # Plot data
- # =============================================================================
- # Determine the first existing trial ID i from the Event object containing all
- # start events. Then, by calling the filter() function of the Neo Block
- # "data_cut_to_analysis_epochs" containing the data cut into the analysis
- # epochs, we ask to return all Segments annotated by the behavioral trial ID i.
- # In this case this call should return one matching analysis epoch around TS-ON
- # belonging to behavioral trial ID i. For monkey N, this is trial ID 1, for
- # monkey L this is trial ID 2 since trial ID 1 is not a correct trial.
- trial_id = int(np.min(start_event.array_annotations['trial_id']))
- trial_segments = cut_trial_block.filter(
- targdict={"trial_id": trial_id}, objects=Segment)
- assert len(trial_segments) == 1
- trial_segment = trial_segments[0]
- # Create figure
- fig = plt.figure(facecolor='w')
- time_unit = pq.CompoundUnit('1./30000*s')
- amplitude_unit = pq.microvolt
- nsx_colors = {2: 'k', 5: 'r', 6: 'b'}
- # Loop through all AnalogSignal objects and plot the signal of the target channel
- # in a color corresponding to its sampling frequency (i.e., originating from the ns2/ns5 or ns2/ns6).
- for i, anasig in enumerate(trial_segment.analogsignals):
- # only visualize neural data
- if anasig.annotations['neural_signal']:
- if 'nsx' in anasig.annotations:
- nsx = anasig.annotations['nsx']
- else:
- nsx = anasig.array_annotations['nsx'][0]
- channel_ids = np.asarray(anasig.array_annotations['channel_ids'], dtype=int)
- target_channel_index = np.where(channel_ids == target_channel_id)[0]
- target_signal = anasig[:, target_channel_index]
- plt.plot(
- target_signal.times.rescale(time_unit),
- target_signal.squeeze().rescale(amplitude_unit),
- label=target_signal.name,
- color=nsx_colors[nsx])
- # Loop through all spike trains and plot the spike time, and overlapping the
- # wave form of the spike used for spike sorting stored separately in the nev
- # file.
- for st in trial_segment.spiketrains:
- color = np.random.rand(3,)
- if st.annotations['channel_id'] == target_channel_id:
- for spike_id, spike in enumerate(st):
- # Plot spike times
- plt.axvline(
- spike.rescale(time_unit).magnitude,
- color=color,
- label='Unit ID %i' % st.annotations['unit_id'])
- # Plot waveforms
- waveform = st.waveforms[spike_id, 0, :]
- waveform_times = np.arange(len(waveform))*time_unit + spike
- plt.plot(
- waveform_times.rescale(time_unit).magnitude,
- waveform.rescale(amplitude_unit),
- '--',
- linewidth=2,
- color=color,
- zorder=0)
- # Loop through all events
- for event in trial_segment.events:
- if event.name == 'TrialEvents':
- for ev_id, ev in enumerate(event):
- plt.axvline(
- ev.rescale(time_unit),
- alpha=0.2,
- linewidth=3,
- linestyle='dashed',
- label=f'event {event.array_annotations["trial_event_labels"][ev_id]}')
- # Finishing touches on the plot
- plt.autoscale(enable=True, axis='x', tight=True)
- plt.xlabel(time_unit.name)
- plt.ylabel(amplitude_unit.name)
- plt.legend(loc=4, fontsize=10)
- # Save plot
- file_name = 'example_plot_from_blackrock_%s' % session_name
- for file_format in ['eps', 'png', 'pdf']:
- fig.savefig(file_name + '.%s' % file_format, dpi=400, format=file_format)
|