|
@@ -0,0 +1,146 @@
|
|
|
+from packaging.version import Version
|
|
|
+import numpy as np
|
|
|
+import neo
|
|
|
+import quantities as pq
|
|
|
+
|
|
|
+n_segments = 2
|
|
|
+n_spiketrains = 8
|
|
|
+n_waveform_samples = 10
|
|
|
+n_analogsignals = 4
|
|
|
+n_irregularlysampledsignals = 1
|
|
|
+n_events = 2
|
|
|
+n_epochs = 3
|
|
|
+
|
|
|
+random_generator = np.random.default_rng(seed=42)
|
|
|
+
|
|
|
+
|
|
|
+def get_rand(shape=None, min=0, max=1, sorted=False):
|
|
|
+ data = random_generator.random(shape)
|
|
|
+
|
|
|
+ #rescaling random numbers to min-max range
|
|
|
+ data = data*(max-min) + min
|
|
|
+ if sorted:
|
|
|
+ data = np.sort(data)
|
|
|
+ return data
|
|
|
+
|
|
|
+def generate_basic_block():
|
|
|
+ block = neo.Block(name=r'my_block')
|
|
|
+ for seg_idx in range(n_segments):
|
|
|
+ seg = neo.Segment(name=f'my_segment_{seg_idx}')
|
|
|
+ block.segments.append(seg)
|
|
|
+
|
|
|
+ for spiketrain_idx in range(n_spiketrains):
|
|
|
+ waveforms = get_rand((10, 14)) * pq.V
|
|
|
+ st = neo.SpikeTrain(times=get_rand((10), max=10, sorted=True)*pq.s, t_stop=10*pq.s,
|
|
|
+ name=f'my_spiketrain_{spiketrain_idx}', waveforms=waveforms,
|
|
|
+ left_sweep=4)
|
|
|
+ st.segment = seg
|
|
|
+ seg.spiketrains.append(st)
|
|
|
+
|
|
|
+ for anasig_idx in range(n_analogsignals):
|
|
|
+ anasig = neo.AnalogSignal(signal=get_rand((100, anasig_idx+2),max=1000)*pq.V,
|
|
|
+ t_start=0*pq.s,
|
|
|
+ sampling_rate=(anasig_idx+1)*pq.Hz,
|
|
|
+ name=f'my_analogsignal_{anasig_idx}')
|
|
|
+ anasig.segment = seg
|
|
|
+ seg.analogsignals.append(anasig)
|
|
|
+
|
|
|
+ for irr_idx in range(n_irregularlysampledsignals):
|
|
|
+ irrsig = neo.IrregularlySampledSignal(times=get_rand((100), sorted=True, max=100)*pq.s,
|
|
|
+ signal=get_rand((100,anasig_idx+2),max=1000)*pq.V,
|
|
|
+ t_start=0*pq.s, t_stop=100*pq.s,
|
|
|
+ name=f'my_irregularlysampledsignal_{irr_idx}')
|
|
|
+ irrsig.segment = seg
|
|
|
+ seg.irregularlysampledsignals.append(irrsig)
|
|
|
+
|
|
|
+ for ev_idx in range(n_events):
|
|
|
+ event = neo.Event(times=get_rand((10),max=10,sorted=True)*pq.s,
|
|
|
+ labels=np.array([f'my_event_timestamp_{i}' for i in range(10)]),
|
|
|
+ name=f'my_event_{ev_idx}')
|
|
|
+ event.segment = seg
|
|
|
+ seg.events.append(event)
|
|
|
+
|
|
|
+ for ep_idx in range(n_epochs):
|
|
|
+ epoch = neo.Epoch(times=get_rand((3),max=6,sorted=True)*pq.s,
|
|
|
+ durations=[1.1, 2.2, 3.3]*pq.s,
|
|
|
+ labels=np.array([f'my_epoch_timestamp_{i}' for i in range(3)]),
|
|
|
+ name=f'my_epoch_{ep_idx}')
|
|
|
+ epoch.segment = seg
|
|
|
+ seg.epochs.append(epoch)
|
|
|
+
|
|
|
+ return block
|
|
|
+
|
|
|
+
|
|
|
+def add_spiketrain_groups(block):
|
|
|
+ spiketrain_groups = []
|
|
|
+ for group_idx in range(n_spiketrains):
|
|
|
+ group = neo.Group(name=f'my_spiketrain_group_{group_idx}', allowed_types=[neo.SpikeTrain])
|
|
|
+ spiketrain_groups.append(group)
|
|
|
+
|
|
|
+ # assign spiketrains to groups
|
|
|
+ for seg_idx, seg in enumerate(block.segments):
|
|
|
+ for idx, st in enumerate(seg.spiketrains):
|
|
|
+ st.group = spiketrain_groups[idx]
|
|
|
+ spiketrain_groups[idx].spiketrains.append(st)
|
|
|
+
|
|
|
+ # attach groups to block
|
|
|
+ block.groups.extend(spiketrain_groups)
|
|
|
+
|
|
|
+
|
|
|
+def add_spiketrain_units_channel_indexes(block):
|
|
|
+ # Linking pattern:
|
|
|
+ # 1 Channel_unit - last channel of analog signal
|
|
|
+ # \- 2 spiketrains
|
|
|
+
|
|
|
+ spiketrain_units = []
|
|
|
+ for unit_idx in range(n_spiketrains):
|
|
|
+ unit = neo.Unit(f'my_unit_{unit_idx}')
|
|
|
+ spiketrain_units.append(unit)
|
|
|
+
|
|
|
+ channel_indexes = []
|
|
|
+ for asig_idx in range(n_analogsignals):
|
|
|
+ signals = [seg.analogsignals[asig_idx] for seg in block.segments]
|
|
|
+ channel_index = neo.ChannelIndex([asig_idx], name=f'my_channel_index_{asig_idx}')
|
|
|
+ channel_index.analogsignals.extend(signals)
|
|
|
+ channel_indexes.append(channel_index)
|
|
|
+ channel_index.block = block
|
|
|
+
|
|
|
+
|
|
|
+ # assign spiketrain to units
|
|
|
+ for seg_idx, seg in enumerate(block.segments):
|
|
|
+ for idx, st in enumerate(seg.spiketrains):
|
|
|
+ st.unit = spiketrain_units[idx]
|
|
|
+ spiketrain_units[idx].spiketrains.append(st)
|
|
|
+
|
|
|
+ # link channel_index to block
|
|
|
+ block.channel_indexes.extend(channel_indexes)
|
|
|
+
|
|
|
+ # attach units to channel_indexes
|
|
|
+ for idx in range(n_spiketrains):
|
|
|
+ # link two spiketrains to a single channel_index
|
|
|
+ channel_indexes[idx//2].units.append(spiketrain_units[idx])
|
|
|
+ spiketrain_units[idx].channel_index = channel_indexes[idx//2]
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ neo_version = Version(neo.__version__)
|
|
|
+ print(f'Generating test file for neo version {neo_version} ...', end='')
|
|
|
+
|
|
|
+ block = generate_basic_block()
|
|
|
+
|
|
|
+ # adding spiketrain units to neo block depending on container objects available
|
|
|
+ if hasattr(neo, 'Group'):
|
|
|
+ add_spiketrain_groups(block)
|
|
|
+ elif hasattr(neo, 'ChannelIndex'):
|
|
|
+ add_spiketrain_units_channel_indexes(block)
|
|
|
+ else:
|
|
|
+ raise ValueError(f'No mechanism to represent spike units found for neo version: {neo_version}')
|
|
|
+
|
|
|
+ io = neo.NixIO(f'generated_file_neo{neo_version}.nix')
|
|
|
+ io.write_block(block)
|
|
|
+ io.close()
|
|
|
+
|
|
|
+ print('done')
|
|
|
+
|
|
|
+
|
|
|
+
|