write.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import pandas as pd
  2. from moviepy.editor import ImageSequenceClip
  3. import multiprocessing
  4. import logging
  5. import pathlib as pl
  6. import numpy as np
  7. import tifffile
  8. def get_extension_from_codec(codec):
  9. codec_map = {"libx264": ".mp4",
  10. "ayuv": ".avi"
  11. }
  12. return codec_map[codec]
  13. class MovieWriter(object):
  14. def __init__(self, flags):
  15. super().__init__()
  16. self.speed_factor = flags["mv_SpeedFactor"]
  17. self.bitrate = flags["mv_bitrate"]
  18. self.codec = flags["mv_exportFormat"]
  19. def get_clip(self, data_numpy_list, data_sampling_period):
  20. data_fps = pd.Timedelta("1s") / data_sampling_period
  21. video_fps = self.speed_factor * data_fps
  22. data_numpy_list_for_moviepy = []
  23. for frame_data_numpy in data_numpy_list:
  24. # need to swap axes as our axis order is XY and moviepy expects YX
  25. frame_data_numpy_swapped = frame_data_numpy.swapaxes(0, 1)
  26. # need to convert it to 8 bit from float
  27. frame_data_numpy_swapped_uint8 = np.array(frame_data_numpy_swapped * 255, dtype=np.uint8)
  28. # flip Y since origin in moviepy is top left
  29. frame_data_for_clip = np.flip(frame_data_numpy_swapped_uint8, axis=0)
  30. data_numpy_list_for_moviepy.append(frame_data_for_clip)
  31. clip = ImageSequenceClip(data_numpy_list_for_moviepy, fps=video_fps)
  32. return clip
  33. def write(self, data_numpy_list, data_sampling_period, full_filename_without_extension):
  34. if data_sampling_period == pd.Timedelta(0):
  35. raise ValueError("Error saving movie! The inter frame period was either not specified or set to 0, "
  36. "the frame-per-second value for movie output could therefore not be calculated.\n"
  37. "Tip: You can save the movie as a TIFF-Stack by setting the flags 'mv_exportFormat' in "
  38. "the tab 'movie' to 'stack_tif' and view the resulting TIFF-Stack in ImageJ")
  39. clip = self.get_clip(data_numpy_list, data_sampling_period)
  40. out_name = f"{full_filename_without_extension}{get_extension_from_codec(self.codec)}"
  41. ffmpeg_params = []
  42. if self.codec == "libx264":
  43. ffmpeg_params = ["-crf", '1']
  44. clip.write_videofile(filename=out_name,
  45. codec=self.codec,
  46. ffmpeg_params=ffmpeg_params,
  47. preset="veryslow",
  48. threads=multiprocessing.cpu_count() - 1,
  49. logger="bar",
  50. bitrate=self.bitrate
  51. )
  52. logging.getLogger("VIEW").info(f"Wrote a movie: {out_name}")
  53. return out_name
  54. class MovieWriterIndividualTif(MovieWriter):
  55. def __init__(self, flags):
  56. super().__init__(flags)
  57. def write(self, data_numpy_list, data_sampling_period, full_filename_without_extension):
  58. if data_sampling_period == pd.Timedelta(0):
  59. data_sampling_period = pd.Timedelta("1s") # fake value, as it will not be written
  60. clip = self.get_clip(data_numpy_list, data_sampling_period)
  61. out_dir_path = pl.Path(full_filename_without_extension)
  62. if not out_dir_path.is_dir():
  63. out_dir_path.mkdir()
  64. filename_format = f"{str(out_dir_path / out_dir_path.name)}%03d.tif"
  65. clip.write_images_sequence(nameformat=filename_format,
  66. logger="bar")
  67. logging.getLogger("VIEW").info(f"Wrote a sequence of images to the folder {str(out_dir_path)}")
  68. return out_dir_path
  69. class MovieWriterStackTif(object):
  70. def __init__(self):
  71. super().__init__()
  72. def write(self, data_numpy_list, data_sampling_period, full_filename_without_extension):
  73. # each image in data_numpy_list is of the format X,Y,Color.
  74. # Stacking them to get 4D data in the format Z, X, Y, Color
  75. data_4D = np.stack(data_numpy_list, axis=0)
  76. # Flip Y since origin in tif is top left
  77. data_4D_Y_flipped = np.flip(data_4D, axis=2)
  78. # convert to uint8
  79. data_4D_uint8 = np.array(data_4D_Y_flipped * 255, dtype=np.uint8)
  80. # convert to format Z, Y, X, color
  81. data_4D_XY_swapped = data_4D_uint8.swapaxes(1, 2)
  82. # convert to TZCYXS format required by imagej
  83. data_4D_formatted = data_4D_XY_swapped[np.newaxis, :, np.newaxis, :, :]
  84. outfile_path = f"{full_filename_without_extension}.tif"
  85. tifffile.imwrite(outfile_path, data=data_4D_formatted, imagej=True)
  86. logging.getLogger("VIEW").info(f"Wrote a tiff stack to {str(outfile_path)}")
  87. return outfile_path
  88. def get_writer(flags):
  89. if flags["mv_exportFormat"] == "single_tif":
  90. return MovieWriterIndividualTif(flags)
  91. elif flags["mv_exportFormat"] == "stack_tif":
  92. return MovieWriterStackTif()
  93. else:
  94. return MovieWriter(flags)