123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- from packaging.version import Version
- import numpy as np
- import neo
- import quantities as pq
- n_segments = 2
- n_spiketrains = 8
- n_waveform_samples = 10
- n_analogsignals = 4
- n_irregularlysampledsignals = 1
- n_events = 2
- n_epochs = 3
- random_generator = np.random.default_rng(seed=42)
- def get_rand(shape=None, min=0, max=1, sorted=False):
- data = random_generator.random(shape)
- #rescaling random numbers to min-max range
- data = data*(max-min) + min
- if sorted:
- data = np.sort(data)
- return data
- def generate_basic_block():
- block = neo.Block(name=r'my_block')
- for seg_idx in range(n_segments):
- seg = neo.Segment(name=f'my_segment_{seg_idx}')
- block.segments.append(seg)
- for spiketrain_idx in range(n_spiketrains):
- waveforms = get_rand((10, 14)) * pq.V
- st = neo.SpikeTrain(times=get_rand((10), max=10, sorted=True)*pq.s, t_stop=10*pq.s,
- name=f'my_spiketrain_{spiketrain_idx}', waveforms=waveforms,
- left_sweep=4)
- st.segment = seg
- seg.spiketrains.append(st)
- for anasig_idx in range(n_analogsignals):
- anasig = neo.AnalogSignal(signal=get_rand((100, anasig_idx+2),max=1000)*pq.V,
- t_start=0*pq.s,
- sampling_rate=(anasig_idx+1)*pq.Hz,
- name=f'my_analogsignal_{anasig_idx}')
- anasig.segment = seg
- seg.analogsignals.append(anasig)
- for irr_idx in range(n_irregularlysampledsignals):
- irrsig = neo.IrregularlySampledSignal(times=get_rand((100), sorted=True, max=100)*pq.s,
- signal=get_rand((100,anasig_idx+2),max=1000)*pq.V,
- t_start=0*pq.s, t_stop=100*pq.s,
- name=f'my_irregularlysampledsignal_{irr_idx}')
- irrsig.segment = seg
- seg.irregularlysampledsignals.append(irrsig)
- for ev_idx in range(n_events):
- event = neo.Event(times=get_rand((10),max=10,sorted=True)*pq.s,
- labels=np.array([f'my_event_timestamp_{i}' for i in range(10)]),
- name=f'my_event_{ev_idx}')
- event.segment = seg
- seg.events.append(event)
- for ep_idx in range(n_epochs):
- epoch = neo.Epoch(times=get_rand((3),max=6,sorted=True)*pq.s,
- durations=[1.1, 2.2, 3.3]*pq.s,
- labels=np.array([f'my_epoch_timestamp_{i}' for i in range(3)]),
- name=f'my_epoch_{ep_idx}')
- epoch.segment = seg
- seg.epochs.append(epoch)
- return block
- def add_spiketrain_groups(block):
- spiketrain_groups = []
- for group_idx in range(n_spiketrains):
- group = neo.Group(name=f'my_spiketrain_group_{group_idx}', allowed_types=[neo.SpikeTrain])
- spiketrain_groups.append(group)
- # assign spiketrains to groups
- for seg_idx, seg in enumerate(block.segments):
- for idx, st in enumerate(seg.spiketrains):
- st.group = spiketrain_groups[idx]
- spiketrain_groups[idx].spiketrains.append(st)
- # attach groups to block
- block.groups.extend(spiketrain_groups)
- def add_spiketrain_units_channel_indexes(block):
- # Linking pattern:
- # 1 Channel_unit - last channel of analog signal
- # \- 2 spiketrains
- spiketrain_units = []
- for unit_idx in range(n_spiketrains):
- unit = neo.Unit(f'my_unit_{unit_idx}')
- spiketrain_units.append(unit)
- channel_indexes = []
- for asig_idx in range(n_analogsignals):
- signals = [seg.analogsignals[asig_idx] for seg in block.segments]
- channel_index = neo.ChannelIndex([asig_idx], name=f'my_channel_index_{asig_idx}')
- channel_index.analogsignals.extend(signals)
- channel_indexes.append(channel_index)
- channel_index.block = block
- # assign spiketrain to units
- for seg_idx, seg in enumerate(block.segments):
- for idx, st in enumerate(seg.spiketrains):
- st.unit = spiketrain_units[idx]
- spiketrain_units[idx].spiketrains.append(st)
- # link channel_index to block
- block.channel_indexes.extend(channel_indexes)
- # attach units to channel_indexes
- for idx in range(n_spiketrains):
- # link two spiketrains to a single channel_index
- channel_indexes[idx//2].units.append(spiketrain_units[idx])
- spiketrain_units[idx].channel_index = channel_indexes[idx//2]
- if __name__ == '__main__':
- neo_version = Version(neo.__version__)
- print(f'Generating test file for neo version {neo_version} ...', end='')
- block = generate_basic_block()
- # adding spiketrain units to neo block depending on container objects available
- if hasattr(neo, 'Group'):
- add_spiketrain_groups(block)
- elif hasattr(neo, 'ChannelIndex'):
- add_spiketrain_units_channel_indexes(block)
- else:
- raise ValueError(f'No mechanism to represent spike units found for neo version: {neo_version}')
- io = neo.NixIO(f'generated_file_neo{neo_version}.nix')
- io.write_block(block)
- io.close()
- print('done')
|