# %% import gc import os import sys from pathlib import Path import numpy as np import pandas as pd import SimpleITK as sitk from tqdm import tqdm import dicom_utils.dicom_utils as du import dicom_utils.dicom_utils.visualize as viz from config import get_project_root from dicom_utils.dicom_utils import augmentation as aug from dicom_utils.dicom_utils import processing as dup from utils import filter_outbounds, sitk_to_numpy from matplotlib import pyplot as plt def resize(image, size): N_CHANNELS = image.shape[0] image_seq = [sitk.GetImageFromArray(image[i, :, :, :]) for i in range(N_CHANNELS)] image_seq = [dup.resample(image, size=(size, size, size)) for image in image_seq] image_seq = [sitk.GetArrayFromImage(image) for image in image_seq] image = np.stack(image_seq, axis=0) return image # %% PROJECT_ROOT = get_project_root() BOX_SIZE = 128 # mm == pixels for voxel size of 1mm3 RESIZE = True DATASET_NAME = 'HN_val' PROJECT_DATA_PATH = PROJECT_ROOT / 'data' / DATASET_NAME PROCESSED_DATA_PATH = PROJECT_DATA_PATH / 'processed' CLINICAL_DATA_FILENAME_CLEAN = f'clinical_{DATASET_NAME}.csv' DCM_DIR = PROCESSED_DATA_PATH / 'dcm' BBOX_OUT_DIR = PROCESSED_DATA_PATH / 'bbox' BBOX_SUBDATASET_DEEP = f'bbox_{BOX_SIZE//2}_deep' BBOX_SUBDATASET_RAD = f'bbox_rad' OUTDIR_DEEP = BBOX_OUT_DIR / BBOX_SUBDATASET_DEEP OUTDIR_RAD = BBOX_OUT_DIR / BBOX_SUBDATASET_RAD os.makedirs(OUTDIR_DEEP, exist_ok=True) os.makedirs(OUTDIR_RAD, exist_ok=True) CT_ranges = [-1050, 3050] PT_ranges = [0, 50] VOXEL_SIZE = [1, 1, 1] # gaussian = sitk.SmoothingRecursiveGaussianImageFilter() # gaussian.SetSigma(2) # %% clinical = pd.read_csv(PROCESSED_DATA_PATH / CLINICAL_DATA_FILENAME_CLEAN) available_patients = patients = [ f for f in os.listdir(DCM_DIR) if os.path.isdir(DCM_DIR / f) ] clinical = clinical.loc[clinical['patient'].isin(available_patients)] errors = [] # %% for i, row in clinical.iterrows(): patient = row['patient'] filename_out = row['filename'] roi_name = row['ROI_name'] roi_modality = row['ROI_modality'] try: scan_CT = du.load_series(DCM_DIR / patient / 'CT') # scan_PT = du.load_SUV(DCM_DIR / patient / 'PT') scan_PT = du.load_series(DCM_DIR / patient / 'PT') rs_filename = os.listdir(DCM_DIR / patient / 'RTSTRUCT')[0] if roi_modality == 'CT': segmentation = du.load_roi( DCM_DIR / patient / 'RTSTRUCT' / rs_filename, roi_name, scan_CT ) elif roi_modality == 'PT': segmentation = du.load_roi( DCM_DIR / patient / 'RTSTRUCT' / rs_filename, roi_name, scan_PT ) else: raise ValueError('ROI modality must be `CT` or `PT`.') # Calculate segmentation mask vertices print(type(segmentation)) start_mm, stop_mm = du.get_bbox_vertices(segmentation) # print(start_mm, stop_mm) center_mm = (np.array(stop_mm) + np.array(start_mm)) / 2 # Calculate vertices for deep learning bbox centered at the center of the segmentation mask start_mm = ( center_mm - BOX_SIZE // 2 - 5 ) # add margin to allow a better interpolation stop_mm = center_mm + BOX_SIZE // 2 + 5 scan_CT_box = du.extract_volume(scan_CT, start_mm, stop_mm) print(scan_CT_box.GetSize()) # scan_CT_box = filter_outbounds(scan_CT_box, CT_ranges) scan_CT_box = du.processing.resample(scan_CT_box, spacing=VOXEL_SIZE) print(scan_CT_box.GetSize()) scan_PT_box = sitk.Resample(scan_PT, scan_CT_box) #  scan_PT_box = filter_outbounds(scan_PT_box, PT_ranges) start_mm = start_mm + 5 # remove margin stop_mm = stop_mm - 5 # remove margin scan_CT_box = du.extract_volume(scan_CT_box, start_mm, stop_mm) print(scan_CT_box.GetSize()) scan_PT_box = du.extract_volume(scan_PT_box, start_mm, stop_mm) out_deep = np.stack( [sitk_to_numpy(scan_CT_box), sitk_to_numpy(scan_PT_box)], axis=0 ) out_deep_resized = resize(out_deep, size=BOX_SIZE // 2) # Save np.save(OUTDIR_DEEP / filename_out, out_deep_resized) # segmentation_box = sitk.Resample(segmentation, scan_CT_box) # segmentation_box = gaussian.Execute(segmentation_box) # mask_box = segmentation_box > 0.5 # print(type(mask_box)) # mask_box = du.extract_volume(mask_box, start_mm, stop_mm) # Save # os.makedirs(OUTDIR_RAD / patient, exist_ok=False) # # sitk.WriteImage(scan_CT_box, str(OUTDIR_RAD / patient / 'CT.nrrd')) # sitk.WriteImage(scan_PT_box, str(OUTDIR_RAD / patient / 'PT.nrrd')) # sitk.WriteImage(mask_box, str(OUTDIR_RAD / patient / 'mask.nrrd')) except Exception as e: print(f'\nError processing patient: {row["patient"]}') print('Exception:', e) errors.append(row['patient']) # %% print(errors)