In [2]:
import nixio
import time
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline 
%config InlineBackend.figure_formats = ['retina'] # only for users with a high resolution display 

## Exercise 1

1. Run the code below and check for the file size.
2. Replace line 37 with ``nixfile = nixio.File.open("radar_trap.nix", nixio.FileMode.Overwrite, compression=nixio.Compression.DefalteNormal)``
3. File size in 1 should be about 80MB, with 2. it should be about 20MB

In [None]:

def generate_number_plate():
 letters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
 chosen_letters = np.random.choice(list(letters), 4)
 number_plate = "%s-%s-%i" % ("".join(chosen_letters[:2]), "".join(chosen_letters[2:]), np.random.randint(999))
 return number_plate


def get_image():
 imgs = glob.glob(os.path.join("resources", "radar_trap_*.png"))
 img = np.random.choice(imgs)
 img_data = plt.imread(img)
 return img_data


def radar_trap_data(duration=1000, car_probability=0.15, dt=0.01, speed_limit=50, speeder_probability=0.01):
 time = np.arange(0, duration, dt)
 car_times = time[np.random.random(len(time)) < (car_probability * dt)]
 car_speeds = speed_limit + np.random.randn(len(car_times)) * 0.05 * speed_limit
 indices = np.arange(len(car_times), dtype=int)
 np.random.shuffle(indices)
 speeders = indices[:int(np.round(speeder_probability * len(car_times)))]
 car_speeds[speeders] += 0.5 * speed_limit
 fines = car_speeds[speeders] * 1.5 + 15 # Euros ;)
 
 number_plates = []
 pictures = []
 for i in range(len(fines)):
 number_plates.append(generate_number_plate())
 pictures.append(get_image())

 return car_times, car_speeds, speeders, fines, number_plates, pictures

times, speeds, speeders, fines, number_plates, pictures = radar_trap_data(duration=5000)

# store data to nix
nixfile = nixio.File.open("radar_trap.nix", nixio.FileMode.Overwrite)
block = nixfile.create_block("radar trap", "speed_measurements")

speed_array = block.create_data_array("car speeds", "nix.irregular_sampled", data=speeds, label="speed", unit="km/h")
speed_array.append_range_dimension(ticks=times, unit="s", label="time")

speeder_times = times[speeders]
speeder_times_array = block.create_data_array("speeder times", "nix.events", data=speeder_times, label="time", unit="s")
speeder_times_array.append_set_dimension()

speeder_fines_array = block.create_data_array("speeder fines", "nix.data.collection", data=fines, label="fines", unit="EUR")
speeder_fines_array.append_set_dimension()

speeder_number_plates = block.create_data_array("number plates", "nix.data.collection", dtype=nixio.DataType.String, data=number_plates)
speeder_number_plates.append_set_dimension()

# rearrange the picture data into one np.array with the first dimension representing the number of positions/speeders
picture_data = np.stack(pictures, axis=0)
picture_array = block.create_data_array("pictures", "nix.data.collection.images", data=picture_data)
picture_array.append_set_dimension()
picture_array.append_sampled_dimension(1, label="height", unit="pixel")
picture_array.append_sampled_dimension(1, label="width", unit="pixel")
picture_array.append_set_dimension(labels=["R", "G", "B", "A"])

speeder_tag = block.create_multi_tag("Speeders", "nix.event_detection", positions=speeder_times_array)
speeder_tag.references.append(speed_array)

speeder_tag.create_feature(speeder_fines_array, nixio.LinkType.Indexed)
speeder_tag.create_feature(speeder_number_plates, nixio.LinkType.Indexed)
speeder_tag.create_feature(picture_array, nixio.LinkType.Indexed)

nixfile.close()

## Exercise 2: Store data with calibration

In [None]:
import matplotlib.pyplot as plt

nixfile = nixio.File.open("data_acquisition.nix", nixio.FileMode.ReadOnly)
block = nixfile.blocks["session 1"]
data_array = block.data_arrays["measurement"]

read_data = data_array[:1000]
nixfile.close()

plt.plot(time[:1000] + 0.05, read_data, label="from file")
plt.plot(time[:1000], data[:1000], label="original")
plt.legend()


## Exercise 3: effect of chunking on write performance

In [13]:

def record_data(samples, channels, dt):
 data = np.zeros((samples, channels))
 t = np.arange(samples) * dt
 for i in range(channels):
 phase = i * 2 * np.pi / channels
 data[:, i] = np.sin(2 * np.pi * t + phase) + (np.random.randn(samples) * 0.1)

 return data


def write_nixfile(filename, chunk_samples=1000, number_of_channels= 10, dt=0.001, chunk_count=100, compression=nixio.Compression.No):
 nixfile = nixio.File.open(filename, nixio.FileMode.Overwrite, compression=compression)
 block = nixfile.create_block("Session 1", "nix.recording_session")
 data_array = block.create_data_array("multichannel_data", "nix.sampled.multichannel", dtype=nixio.DataType.Double,
 shape=(chunk_samples, number_of_channels), label="voltage", unit="mV")
 data_array.append_sampled_dimension(0.001, label="time", unit="s")
 data_array.append_set_dimension(labels=["channel %i" % i for i in range(number_of_channels)])
 
 total_samples = chunk_count * chunk_samples
 data = record_data(total_samples, number_of_channels, dt)
 chunks_recorded = 0
 t0 = time.time()
 while chunks_recorded < chunk_count:
 start_index = chunk_samples * chunks_recorded
 if chunks_recorded == 0:
 data_array.write_direct(data[start_index:start_index + chunk_samples, :])
 else:
 data_array.append(data[start_index:start_index+chunk_samples, :], axis=0)
 chunks_recorded += 1
 total_time = time.time() - t0

 nixfile.close()
 return total_time

total_samples = 1000000 # 1 Mio
chunk_sizes = [50, 100, 500, 1000, 5000, 10000, 50000, 100000]
chunk_counts = [total_samples//cs for cs in chunk_sizes]
times = []
for cs, cc in zip(chunk_sizes, chunk_counts):
 time_needed = write_nixfile("chunking_test.nix", chunk_samples=cs, chunk_count=cc)
 times.append(time_needed)
 print(cs, cc, time_needed)
