123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- from helper_functions import pretty, make_dirs, slice_volume, files_in_dir, files_ind_dir_wo_string, make_volume
- from nipype.interfaces import ants
- from os import path
- from shutil import rmtree
- import sys
- import getopt
- # Help message.
- tab = ' '
- usage_message = '\nCOMMAND:\n' + tab + sys.argv[0] + \
- '\n\nOPTIONS:\n' + \
- tab + '-h, help menu\n' + \
- tab + '-e, --template, <input template>\n' + \
- tab + '-s, --slices, <directory with input slices>\n' + \
- tab + '-r, --rawvolume, <raw volume of slices>\n' + \
- tab + '-t, --inittrans, <initial transform, roughly transforming raw slice volume to match input ' \
- 'volume, manually created>\n' + \
- tab + '-d, --id, <brain id>\n' + \
- tab + '-o, --output, <output directory, will create new directory if not present>\n' + \
- tab + '-l, --log, [bool]>\n'
- def registration(fixed, moving, result, reg_type, sigma, shrink, param, dim, initial_trans='', inverse_result=''):
- """
- Registration function. Registeres the moving image to the fixed with given parameters. If initial transform is not
- given then the images are aligned by center of mass. Outputs inversely transformed result if given.
- :param fixed: Path to fixed image.
- :param moving: Path to moving image.
- :param result: Path to result image.
- :param reg_type: Registration type. List containing, Rigid, Affine, and/or SyN.
- :param sigma: List of sigma parameters. Ordered according to the reg_type parameter.
- :param shrink: List of shrink parameters. Ordered according to the reg_type parameter.
- :param param: List of transform parameters. Ordered according to the reg_type parameter.
- :param dim: Dimension of images. Typically 2 or 3.
- :param initial_trans: Optional path to initial moving transform. If not given, the images will be matched initially
- by aligning the center of mass.
- :param inverse_result: Optional path to the inversely transformed fixed image.
- :return: Returns nothing.
- """
- # Extract number of registrations.
- steps = len(reg_type)
- # Add 1000 iterations for each step.
- if steps == 1:
- iterations = [[1000] * len(sigma[0])]
- else:
- iterations = []
- for i, reg in enumerate(reg_type):
- iteration = [1000] * len(sigma[i])
- iterations.append(iteration)
- # Create registration instance.
- reg_instance = ants.Registration(dimension=dim,
- transforms=reg_type,
- transform_parameters=param,
- metric=['MeanSquares']*steps,
- interpolation='BSpline',
- fixed_image=[fixed],
- moving_image=[moving],
- metric_weight=[1]*steps,
- radius_or_number_of_bins=[32]*steps,
- number_of_iterations=iterations,
- smoothing_sigmas=sigma,
- shrink_factors=shrink,
- convergence_threshold=[1.e-7],
- sampling_strategy=['Regular']*steps,
- use_histogram_matching=True,
- winsorize_lower_quantile=0.05,
- winsorize_upper_quantile=0.95,
- write_composite_transform=True,
- output_warped_image=result,
- output_transform_prefix=result.split('.')[0] + '-Transformation',
- num_threads=12)
- # Add initial moving transform if given, else match by center of mass.
- if not initial_trans == '':
- reg_instance.inputs.initial_moving_transform = initial_trans
- else:
- reg_instance.inputs.initial_moving_transform_com = 1
- # Output reverse results if path is given.
- if not inverse_result == '':
- reg_instance.inputs.output_inverse_warped_image = inverse_result
- # Run registration.
- reg_instance.run()
- def co_register(template, slices, raw_volume, initial_transform, out_dir, brain_id, r=3, a=2, nl=3, print_log=False):
- """
- Function for reconstructing brain volume by iteratively registering to given template. The slices are stacked into a
- 3-dimensional volume that is registered to the template. The template is inversely transformed and sliced, resulting
- in slice pairs with a template slice and an input slice. For each pair, the input slice is registered to the
- template slice, and all the resulting registered slices are stacked to a volume. This process is repeated r times
- rigid, a times rigid and affine, and nl times rigid, affine, and non-linear. Returns the path to the final
- reconstructed brain volume.
- :param template: Path to template used as a reference for brain reconstruction.
- :param slices: List of paths to slices that needs to be reconstructed.
- :param raw_volume: Path to the initial stacked volume, needed for first rigid registration step.
- :param initial_transform:Initial 3-dimensional transform of the raw volume to the template. If empty string given,
- they will be matched by the center of mass.
- :param out_dir: Output directory.
- :param brain_id: Mouse id.
- :param r: Int [Default: 3]. Number of rigid registrations.
- :param a: Int [Default: 2]. Number of rigid and affine registrations.
- :param nl: Int [Default: 3]. Number of rigid, affine, and non-linear registrations.
- :param print_log: Boolean [Default: False]. Will print log if true.
- :return: Return path to the final reconstructed brain volume.
- """
- # Determine whether the reference template is the Allen atlas (1. iteration) or the DAPI-template (2. iteration).
- if 'allen' in template:
- template_type = 'allen'
- else:
- template_type = 'dapi'
- volume_postfix = '_hv_reg_' + template_type + '_' + brain_id
- # List of slices.
- raw_slices = files_in_dir(slices, '.nii')
- if print_log:
- print('Volume given interpreted as:', template_type)
- print('Checking paths.')
- if not path.exists(raw_volume):
- print('The path:', raw_volume, 'does not exist.')
- if not path.exists(template):
- print('The path:', template, 'does not exist.')
- if not path.exists(initial_transform):
- print('The path:', initial_transform, 'does not exist.')
- if print_log:
- print('Defining iterations.')
- reg_types = ['Rigid'] * r + ['Affine'] * a + ['SyN'] * nl
- if print_log:
- print(reg_types)
- # Run through iterations.
- for i, iteration in enumerate(reg_types):
- cur_work_dir = out_dir + pretty(i) + '/'
- make_dirs(cur_work_dir)
- if print_log:
- print('Running iteration', pretty(i))
- print('Current work dir:', cur_work_dir)
- if i == 0:
- hv = raw_volume
- else:
- hv = out_dir + pretty(i) + volume_postfix + '.nii'
- if print_log:
- print('Deleting prev. iteration work dir:', out_dir + pretty(i - 1) + '/')
- rmtree(out_dir + pretty(i - 1) + '/')
- # Iteration specific registration parameters.
- if iteration == 'Rigid':
- reg = ['Rigid']
- params = [(0.25,)]
- vol_sigma = [[4, 4, 2, 2]]
- vol_shrink = [[32, 16, 8, 4]]
- slice_sigma = [[4, 4, 2, 2]]
- slice_shrink = [[32, 16, 8, 4]]
- if i == 0:
- vol_sigma = [[4, 4, 4]]
- vol_shrink = [[32, 16, 8]]
- slice_sigma = [[4, 4, 2]]
- slice_shrink = [[32, 16, 8]]
- elif iteration == 'Affine':
- reg = ['Rigid', 'Affine']
- params = [(0.25,), (0.25,), ]
- vol_sigma = [[4, 4, 2, 2], [4, 4, 2, 2]]
- vol_shrink = [[32, 16, 8, 4], [32, 16, 8, 4]]
- slice_sigma = [[4, 4, 2, 2], [4, 4, 2, 2]]
- slice_shrink = [[32, 16, 8, 4], [32, 16, 8, 4]]
- elif iteration == 'SyN':
- reg = ['Rigid', 'Affine', 'SyN']
- params = [(0.25,), (0.25,), (0.15, 3, 0.5)]
- vol_sigma = [[4, 4, 2, 2], [4, 4, 2, 2], [4, 4, 4, 2]]
- vol_shrink = [[32, 16, 8, 4], [32, 16, 8, 4], [64, 32, 16, 8]]
- slice_sigma = [[4, 4, 2, 2], [4, 4, 2, 2], [4, 4, 4, 2, 2]]
- slice_shrink = [[32, 16, 8, 4], [32, 16, 8, 4], [64, 32, 16, 8, 4]]
- if i == len(reg_types)-1:
- slice_sigma = [[4, 4, 2, 2], [4, 4, 2, 2], [4, 4, 4, 2, 2, 1]]
- slice_shrink = [[32, 16, 8, 4], [32, 16, 8, 4], [64, 32, 16, 8, 4, 2]]
- else:
- sys.exit('The registration type "' + str(iteration) + '" does not exist. Reconstruction stopped.')
- # Define name and path for registered template to hv.
- cur_reg_name = pretty(i) + '-' + template_type + '_reg_to_hv.nii'
- cur_reg_path = cur_work_dir + cur_reg_name
- if print_log:
- print('Registering hv to given volume - either MR or atlas')
- if not path.exists(cur_reg_path):
- registration(template, hv, cur_work_dir + 'hv_to_template.nii', reg, vol_sigma,
- vol_shrink, params, dim=3, initial_trans=initial_transform,
- inverse_result=cur_reg_path)
- # Define and create directory for sliced template.
- template_slice_dir = cur_work_dir + cur_reg_name.split('.')[0] + '-slices/'
- make_dirs(template_slice_dir)
- if print_log:
- print('Slicing', cur_reg_path, 'into', template_slice_dir)
- # Slice template.
- slice_volume(cur_reg_path, template_slice_dir)
- template_slices = files_in_dir(template_slice_dir, '.nii')
- if len(raw_slices) == len(template_slices):
- print('Number of images matches.')
- else:
- sys.exit('Mismatch between number of images.')
- # Make directory for slice to slice registrations.
- reg_slices_dir = cur_work_dir + pretty(i) + '-raw_reg_template/'
- make_dirs(reg_slices_dir)
- # Registered slice postfix.
- reg_slice_postfix = '_reg'
- # Perform slice to slice registrations.
- for n, (raw_slice, template_slice) in enumerate(zip(raw_slices, template_slices)):
- reg_slice = reg_slices_dir + pretty(i) + '_S' + pretty(n) + reg_slice_postfix + '.nii'
- if not path.exists(reg_slice):
- registration(template_slice, raw_slice, reg_slice, reg,
- slice_sigma, slice_shrink, params, 2)
- # Stack registered slices into new volume.
- reg_slices = files_ind_dir_wo_string(reg_slices_dir, exclusion_string='-Transformation')
- final_volume = out_dir + pretty(i + 1) + volume_postfix + '.nii'
- make_volume(reg_slices, final_volume)
- # Return final reconstructed brain volume.
- return final_volume
- if __name__ == '__main__':
- arguments = sys.argv
- log = False
- if len(arguments) == 1:
- print(usage_message)
- else:
- try:
- opts, args = getopt.getopt(arguments[1:],
- "he:s:r:t:d:o:l:",
- ["template=", "slices=", "rawvolume", "inittrans", "id", "output", "log="])
- except getopt.GetoptError:
- print('\nSomething is not right in your syntax. Tak a look at your possibilities:\n', usage_message)
- sys.exit(2)
- for opt, arg in opts:
- if opt == "-h":
- print(usage_message)
- sys.exit()
- elif opt in ("-e", "--template"):
- input_template = arg
- elif opt in ("-s", "--slices"):
- input_slices = arg
- elif opt in ("-r", "--rawvolume"):
- input_raw_volume = arg
- elif opt in ("-t", "--inittrans"):
- input_init_trans = arg
- elif opt in ("-d", "--id"):
- input_brain_id = arg
- elif opt in ("-o", "--output"):
- output_dir = arg
- elif opt in ("-l", "--log"):
- if arg == 'True' or arg == 'true':
- input_print_log = True
- elif arg == 'False' or arg == 'false':
- input_print_log = False
- else:
- print('\nSomething is not right in your syntax. Tak a look at your possibilities:\n', usage_message)
- sys.exit(2)
- try:
- co_register(input_template,
- input_slices,
- input_raw_volume,
- input_init_trans,
- output_dir,
- input_brain_id,
- print_log=input_print_log)
- except NameError:
- print('\nYou are not (properly) defining all the input variables. Tak a look at what is needed:\n',
- usage_message)
|