#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Simple program for ITK image read/write in Python
# import itk
import numpy as np
import logging
logger = logging.getLogger(__name__)
import os.path
import re
try:
import dicom
except:
import pydicom as dicom
import os.path as op
from . import rawN
from . import misc
# from sys import argv
from . import dcmtools
[docs]def write(data3d, path, filetype='auto', metadata=None):
"""
:param data3d: input ndarray
:param path: output path, if braces are in the name ("dir/file{:04d}.dcm"), image stack is produced .
Check function filename_format() for more details.
:param filetype: dcm, png, h5, ... "image_stack"
:param metadata: metadata f.e. {'voxelsize_mm': [3,2,2]}
:return:
"""
dw = DataWriter()
dw.Write3DData(data3d, path, filetype, metadata)
[docs]class DataWriter:
def __init__(self):
self.stop_writing = False
self.progress_callback = None
def __get_segmentation_path(self, path):
""" Create path with "_segmentation" suffix and keep extension.
:param path:
:return:
"""
startpath, ext = os.path.splitext(path)
segmentation_path = startpath + "_segmentation" + ext
return segmentation_path
[docs] def Write3DData(self, data3d, path, filetype='auto', metadata=None, progress_callback=None, sfin=True):
"""
:param data3d: input ndarray data
:param path: output path, to specify slice number advanced formatting options (like {:06d}) can be used
Check function filename_format() for more details.
:param metadata: {'voxelsize_mm': [1, 1, 1]}
:param filetype: dcm, vtk, rawiv, image_stack
:param progress_callback: fuction for progressbar f.e. callback(value, minimum, maximum)
:param sfin: Use separate file for segmentation if necessary
"""
self.orig_path = path
path = os.path.expanduser(path)
try:
d3d = data3d.pop('data3d')
metadata = data3d
data3d = d3d
except:
pass
if progress_callback is not None:
self.progress_callback = progress_callback
if filetype == 'auto':
startpath, ext = os.path.splitext(path)
filetype = ext[1:]
segmentation = None
if metadata is not None and "segmentation" in metadata.keys():
segmentation_path = self.__get_segmentation_path(path)
segmentation = metadata["segmentation"]
mtd = {'voxelsize_mm': [1, 1, 1]}
if metadata is not None:
mtd.update(metadata)
metadata=mtd
if path.find('{') >= 0:
filetype = 'image_stack'
# one_file_per_slice = True
# if one_file_per_slice:
# self._one_file_per_slice(self, data3d, path, filetype, metadata)
# else:
# self._all_in_one_file(self, data3d, path, filetype, metadata)
#
# def _all_in_one_file(self, data3d, path, filetype, metadata):
if filetype in ['vtk', 'tiff', 'tif']:
self._write_with_sitk(path, data3d, metadata)
if sfin and segmentation is not None:
self._write_with_sitk(segmentation_path, segmentation, metadata)
elif filetype in ['dcm', 'DCM', 'dicom']:
self._write_with_sitk(path, data3d, metadata)
self._fix_sitk_bug(path, metadata)
if sfin and segmentation is not None:
self._write_with_sitk(segmentation_path, segmentation, metadata)
self._fix_sitk_bug(segmentation_path, metadata)
elif filetype in ['rawiv']:
rawN.write(path, data3d, metadata)
elif filetype in ['image_stack']:
self.save_image_stack(data3d, path, metadata)
elif filetype in ['hdf5', 'hdf', 'h5', 'he5']:
self.save_hdf5(data3d, path, metadata)
elif filetype in ['pkl', 'pklz']:
from . import misc
metadata['data3d'] = data3d
datap = metadata
misc.obj_to_file(datap, path)
else:
logger.error('Unknown filetype: "' + filetype + '"')
# data = dicom.read_file(onefile)
def _write_with_sitk(self, path, data3d, metadata):
self._makedirs(path)
import SimpleITK as sitk
dim = dcmtools.get_sitk_image_from_ndarray(data3d)
vsz = metadata['voxelsize_mm']
dim.SetSpacing([vsz[1], vsz[2], vsz[0]])
sitk.WriteImage(dim, path)
def _fix_sitk_bug(self, path, metadata):
"""
There is a bug in simple ITK for Z axis in 3D images. This is a fix
:param path:
:param metadata:
:return:
"""
ds = dicom.read_file(path)
ds.SpacingBetweenSlices = str(metadata["voxelsize_mm"][0])[:16]
dicom.write_file(path, ds)
[docs] def save_hdf5(self, data3d, path, metadata):
# TODO this is not implemented in proper way
import h5py
f = h5py.File(path, "w")
f.create_dataset('data3d', data=data3d, compression='gzip')
met = f.create_group('metadata')
met.create_dataset('voxelsize_mm', data=metadata['voxelsize_mm'], compression='gzip')
# f.create_dataset('metadata', data=metadata, compression='gzip')
f.flush()
f.close()
def __write_h5_key(self, grp, dct):
import h5py
import numpy as np
retval = {}
for key in dct.keys():
try:
data = dct[key]
if type(data) == np.ndarray:
if data.dtype == np.dtype('O'):
logger.warning("problem with dtype('O')")
print("Press 'c' for continue")
from PyQt4 import QtCore; QtCore.pyqtRemoveInputHook()
import ipdb; ipdb.set_trace()
else:
grp.create_dataset(key, data=data)
elif type(data) == list:
grp.create_dataset(key, data=data)
elif type(data) == dict:
subgrp = grp.create_group('key')
self.__write_h5_key(subgrp, data)
else:
grp[key] = data
except:
import traceback
logger.warning(traceback.format_exc())
return retval
[docs] def DataCopyWithOverlay(self, dcmfilelist, out_dir, overlays):
"""
Function make 3D data from dicom file slices
:dcmfilelist list of sorted .dcm files
:overlays dictionary of binary overlays. {1:np.array([...]), 3:...}
:out_dir output directory
"""
dcmlist = dcmfilelist
# data3d = []
for i in range(len(dcmlist)):
onefile = dcmlist[i]
logger.info(onefile)
data = dicom.read_file(onefile)
for i_overlay in overlays.keys():
overlay3d = overlays[i_overlay]
data = self.encode_overlay_slice(data,
overlay3d[-1 - i, :, :],
i_overlay)
# construct output path
head, tail = os.path.split(os.path.normpath(onefile))
filename_out = os.path.join(out_dir, tail)
# save
data.save_as(filename_out)
# import pdb; pdb.set_trace()
[docs] def add_overlay_to_slice_file(
self,
filename,
overlay,
i_overlay,
filename_out=None
):
""" Function adds overlay to existing file.
"""
if filename_out is None:
filename_out = filename
filename = op.expanduser(filename)
data = dicom.read_file(filename)
data = self.encode_overlay_slice(data, overlay, i_overlay)
data.save_as(filename_out)
pass
[docs] def encode_overlay_slice(self, data, overlay, i_overlay):
"""
"""
# overlay index
n_bits = 8
# On (60xx,3000) are stored ovelays.
# First is (6000,3000), second (6002,3000), third (6004,3000),
# and so on.
dicom_tag1 = 0x6000 + 2 * i_overlay
# data (0x6000, 0x3000)
# WR = 'OW'
# WM = 1
# On (60xx,0010) and (60xx,0011) is stored overlay size
row_el = dicom.dataelem.DataElement(
(dicom_tag1, 0x0010),
'US',
int(overlay.shape[0])
)
data[row_el.tag] = row_el
col_el = dicom.dataelem.DataElement(
(dicom_tag1, 0x0011),
'US',
int(overlay.shape[1])
)
data[col_el.tag] = col_el
# arrange values to bit array
overlay_linear = np.reshape(overlay, np.prod(overlay.shape))
# allocation of dataspace
encoded_linear = np.zeros(
(np.prod(overlay.shape) / n_bits).astype(np.int),
dtype=np.uint8
)
# encoded data
for i in range(0, len(overlay_linear)):
if overlay_linear[i] > 0:
bit = 1
else:
bit = 0
encoded_linear[int(i / n_bits)] |= bit << (i % n_bits)
overlay_raw = encoded_linear.tostring()
# Decoding data. Each bit is stored as array element
# TODO neni tady ta jednička blbě?
# for i in range(1,len(overlay_raw)):
# for k in range (0,n_bits):
# byte_as_int = ord(overlay_raw[i])
# decoded_linear[i*n_bits + k] = (byte_as_int >> k) & 0b1
#
# overlay = np.array(pol)
overlay_el = dicom.dataelem.DataElement(
(dicom_tag1, 0x3000),
'OW',
overlay_raw
)
data[overlay_el.tag] = overlay_el
return data
def _makedirs(self, filepattern, series_number=None):
filename = get_first_filename(filepattern, series_number=series_number)
datadir, dataname = os.path.split(filename)
if datadir != "" and not os.path.exists(datadir):
os.makedirs(datadir)
[docs] def save_image_stack(self, data3d, filepattern, metadata=None):
if (metadata is not None) and "series_number" in metadata.keys():
series_number = int(metadata["series_number"])
else:
series_number = get_unoccupied_series_number(filepattern)
self._makedirs(filepattern, series_number=series_number)
datadir, dataname = os.path.split(filepattern)
databasename, dataext = os.path.splitext(dataname)
if filepattern.find('{') < 0:
# filepattern does not contain place for integer
filepattern = os.path.join(
datadir,
databasename + "{:05d}" + dataext)
# print(filepattern)
if (metadata is not None) and "voxelsize_mm" in metadata.keys():
z_position = 0.0 # metadata["voxelsize_mm"][0]
z_vs = metadata["voxelsize_mm"][0]
else:
z_position = 0.0
z_vs = 1.0
total_number = data3d.shape[0]
for i in range(total_number):
if self.progress_callback is not None:
self.progress_callback(value=i, minimum=0, maximum=total_number)
if self.stop_writing:
break
newfilename = filename_format(filepattern, slice_number=i, slice_position=z_position, series_number=series_number)
# newfilename = filepattern.format(i)
logger.debug(newfilename)
data2d = data3d[i, :, :]
import SimpleITK as sitk
# pixelType = itk.UC
# imageType = itk.Image[pixelType, 2]
dim = sitk.GetImageFromArray(data2d)
if metadata is not None:
vsz = np.asarray(metadata['voxelsize_mm']).astype('double')
dim.SetSpacing([vsz[0], vsz[2], vsz[1]])
# import ipdb; ipdb.set_trace()
sitk.WriteImage(dim, newfilename)
z_position += z_vs
[docs] def stop(self):
self.stop_writing = True
[docs]def get_first_filename(filepattern, series_number=None):
filepattern = os.path.expanduser(filepattern)
if series_number is None:
series_number = get_unoccupied_series_number(filepattern)
return filename_format(filepattern, series_number=series_number)
[docs]def get_unoccupied_series_number(filepattern, series_number=1):
filepattern = os.path.expanduser(filepattern)
filename = filename_format(filepattern, series_number=series_number)
while os.path.exists(filename):
series_number += 1
fn = filename_format(filepattern, series_number=series_number)
if fn == filename:
# no series number is used in filepattern
return series_number -1
filename = fn
return series_number
# def _fill_filepattern_based_on_rexp(rexp1, filepattern, something_to_fill):
# if type(something_to_fill) is str:
# sub1 = re.sub(rexp1, "", filepattern)
#
# # now it looks for series number
# rexp1 = r"({\s*seriesn\s*:?.*?})"
# rexp2 = r"({\s*series_number\s*:?.*?})"
#
# sub1 = re.findall(rexp1, filepattern)
# sub2 = re.findall(rexp2, filepattern)
#
# for single_pattern in sub1:
# pattern = single_pattern.format(series_number=series_number, seriesn=series_number)
# filepattern = re.sub(rexp1, pattern, filepattern)
#
# for single_pattern in sub2:
# pattern = single_pattern.format(series_number=series_number, seriesn=series_number)
# filepattern = re.sub(rexp2, pattern, filepattern)
[docs]def filepattern_fill_slice_number_or_position(filepattern, slice_description):
# for string like series number there should be ignored (removed) the formating part
if type(slice_description) is str:
rexp1 = r"{\s*slicen\s*(:?.*?)}"
rexp2 = r"{\s*slice_number\s*(:?.*?)}"
rexp3 = r"{\s*slicep\s*(:?.*?)}"
rexp4 = r"{\s*slice_position\s*(:?.*?)}"
rexp5 = r"{\s*(:.*?)}"
filepattern = re.sub(rexp1, "", filepattern)
filepattern = re.sub(rexp2, "", filepattern)
filepattern = re.sub(rexp3, "", filepattern)
filepattern = re.sub(rexp4, "", filepattern)
filepattern = re.sub(rexp5, "", filepattern)
# now it looks for series number
rexp1 = r"({\s*slicen\s*:?.*?})"
rexp2 = r"({\s*slice_number\s*:?.*?})"
rexp3 = r"({\s*slicep\s*:?.*?})"
rexp4 = r"({\s*slice_position\s*:?.*?})"
rexp5 = r"({\s*:.*?})"
rexp6 = r"({\s*})"
sub1 = re.findall(rexp1, filepattern)
sub2 = re.findall(rexp2, filepattern)
sub3 = re.findall(rexp3, filepattern)
sub4 = re.findall(rexp4, filepattern)
sub5 = re.findall(rexp5, filepattern)
sub6 = re.findall(rexp5, filepattern)
for single_pattern in sub1:
pattern = single_pattern.format(slice_number=slice_description, slicen=slice_description, slice_position=slice_description, slicep=slice_description)
filepattern = re.sub(rexp1, pattern, filepattern)
for single_pattern in sub2:
pattern = single_pattern.format(slice_number=slice_description, slicen=slice_description, slice_position=slice_description, slicep=slice_description)
filepattern = re.sub(rexp2, pattern, filepattern)
for single_pattern in sub3:
pattern = single_pattern.format(slice_number=slice_description, slicen=slice_description, slice_position=slice_description, slicep=slice_description)
filepattern = re.sub(rexp3, pattern, filepattern)
for single_pattern in sub4:
pattern = single_pattern.format(slice_number=slice_description, slicen=slice_description, slice_position=slice_description, slicep=slice_description)
filepattern = re.sub(rexp4, pattern, filepattern)
for single_pattern in sub5:
pattern = single_pattern.format(slice_number=slice_description, slicen=slice_description, slice_position=slice_description, slicep=slice_description)
filepattern = re.sub(rexp5, pattern, filepattern)
for single_pattern in sub6:
pattern = single_pattern.format(slice_number=slice_description, slicen=slice_description, slice_position=slice_description, slicep=slice_description)
filepattern = re.sub(rexp6, pattern, filepattern)
return filepattern
[docs]def filepattern_fill_series_number(filepattern, series_number):
# for string like series number there should be ignored (removed) the formating part
if type(series_number) is str:
rexp1 = r"{\s*seriesn\s*(:?.*?)}"
rexp2 = r"{\s*series_number\s*(:?.*?)}"
filepattern = re.sub(rexp1, "", filepattern)
filepattern = re.sub(rexp2, "", filepattern)
# now it looks for series number
rexp1 = r"({\s*seriesn\s*:?.*?})"
rexp2 = r"({\s*series_number\s*:?.*?})"
sub1 = re.findall(rexp1, filepattern)
sub2 = re.findall(rexp2, filepattern)
for single_pattern in sub1:
pattern = single_pattern.format(series_number=series_number, seriesn=series_number)
filepattern = re.sub(rexp1, pattern, filepattern)
for single_pattern in sub2:
pattern = single_pattern.format(series_number=series_number, seriesn=series_number)
filepattern = re.sub(rexp2, pattern, filepattern)
return filepattern
[docs]def saveOverlayToDicomCopy(input_dcmfilelist, output_dicom_dir, overlays,
crinfo, orig_shape):
""" Save overlay to dicom. """
from . import datawriter as dwriter
# import qmisc
if not os.path.exists(output_dicom_dir):
os.makedirs(output_dicom_dir)
import imtools.image_manipulation
# uncrop all overlays
for key in overlays:
overlays[key] = imtools.image_manipulation.uncrop(overlays[key], crinfo, orig_shape)
dw = dwriter.DataWriter()
dw.DataCopyWithOverlay(input_dcmfilelist, output_dicom_dir, overlays)