1+ from __future__ import annotations
2+
3+ from portpy .photon import Optimization
4+ from typing import TYPE_CHECKING
5+ if TYPE_CHECKING :
6+ from portpy .photon .plan import Plan
7+ from portpy .photon .influence_matrix import InfluenceMatrix
8+ from portpy .photon .clinical_criteria import ClinicalCriteria
9+ import cvxpy as cp
10+ import numpy as np
11+ from copy import deepcopy
12+ try :
13+ from sklearn .utils .extmath import randomized_svd
14+ except ImportError :
15+ pass
16+ import scipy
17+ try :
18+ import pywt
19+ except ImportError :
20+ pass
21+
22+
23+ class CompressRTPOptimization (Optimization ):
24+ """
25+ Class for Compressed RTP optimization. It is child class of PortPy.Photon Optimization class
26+
27+ - **Attributes** ::
28+ :param my_plan: object of class Plan
29+ :param inf_matrix: object of class InfluenceMatrix
30+ :param clinical_criteria: object of class ClinicalCriteria
31+ :param opt_params: dictionary of vmat optimization parameters
32+ :param vars: dictionary of variables
33+
34+ :Example:
35+ >>> opt = CompressRTPOptimization(my_plan=my_plan, inf_matrix=inf_matrix, clinical_criteria=clinical_criteria, opt_params=vmat_opt_params)
36+ >>> opt.create_cvxpy_problem_compressed(solver='MOSEK', verbose=True)
37+
38+ - **Methods** ::
39+ :create_cvxpy_problem_compressed()
40+ Creates cvxpy problem for solving using compressed data
41+ """
42+ def __init__ (self , my_plan : Plan , inf_matrix : InfluenceMatrix = None ,
43+ clinical_criteria : ClinicalCriteria = None ,
44+ opt_params : dict = None , vars : dict = None ):
45+ # Call the constructor of the base class (Optimization) using super()
46+ super ().__init__ (my_plan = my_plan , inf_matrix = inf_matrix ,
47+ clinical_criteria = clinical_criteria ,
48+ opt_params = opt_params , vars = vars )
49+
50+ def create_cvxpy_problem_compressed (self , S = None , H = None , W = None ):
51+
52+ """
53+ It runs optimization to create optimal plan based upon clinical criteria
54+
55+ :param S: sparse influence matrix. Uses influence matrix in my_plan by default
56+ :param H: tall skinny matrix. It is obtained using SVD of L = A-S. UKV = svd(L, rank=k). H=U
57+ :param W: thin wide matrix. It is obtained using SVD of L = A-S. W=KV
58+
59+ """
60+ # unpack data
61+ my_plan = self .my_plan
62+ inf_matrix = self .inf_matrix
63+ opt_params = self .opt_params
64+ clinical_criteria = self .clinical_criteria
65+ x = self .vars ['x' ]
66+ obj = self .obj
67+ constraints = self .constraints
68+
69+ # get opt params for optimization
70+ obj_funcs = opt_params ['objective_functions' ] if 'objective_functions' in opt_params else []
71+ opt_params_constraints = opt_params ['constraints' ] if 'constraints' in opt_params else []
72+
73+ if S is None :
74+ S = inf_matrix .A
75+ num_fractions = clinical_criteria .get_num_of_fractions ()
76+ st = inf_matrix
77+ if W is None and H is None :
78+ H = np .zeros ((S .shape [0 ], 1 ))
79+ W = np .zeros ((1 , S .shape [1 ]))
80+
81+ # Construct optimization problem
82+ Wx = cp .Variable (H .shape [1 ]) # creating dummy variable for dose
83+ # Generating objective functions
84+ print ('Objective Start' )
85+ for i in range (len (obj_funcs )):
86+ if obj_funcs [i ]['type' ] == 'quadratic-overdose' :
87+ if obj_funcs [i ]['structure_name' ] in my_plan .structures .get_structures ():
88+ struct = obj_funcs [i ]['structure_name' ]
89+ if len (st .get_opt_voxels_idx (struct )) == 0 : # check if there are any opt voxels for the structure
90+ continue
91+ key = self .matching_keys (obj_funcs [i ], 'dose' )
92+ dose_gy = self .dose_to_gy (key , obj_funcs [i ][key ]) / num_fractions
93+ dO = cp .Variable (len (st .get_opt_voxels_idx (struct )), pos = True )
94+ obj += [(1 / len (st .get_opt_voxels_idx (struct ))) * (obj_funcs [i ]['weight' ] * cp .sum_squares (dO ))]
95+ constraints += [S [st .get_opt_voxels_idx (struct ), :] @ x + H [st .get_opt_voxels_idx (struct ), :] @ Wx <= dose_gy + dO ]
96+ elif obj_funcs [i ]['type' ] == 'quadratic-underdose' :
97+ if obj_funcs [i ]['structure_name' ] in my_plan .structures .get_structures ():
98+ struct = obj_funcs [i ]['structure_name' ]
99+ if len (st .get_opt_voxels_idx (struct )) == 0 :
100+ continue
101+ key = self .matching_keys (obj_funcs [i ], 'dose' )
102+ dose_gy = self .dose_to_gy (key , obj_funcs [i ][key ]) / num_fractions
103+ dU = cp .Variable (len (st .get_opt_voxels_idx (struct )), pos = True )
104+ obj += [(1 / len (st .get_opt_voxels_idx (struct ))) * (obj_funcs [i ]['weight' ] * cp .sum_squares (dU ))]
105+ constraints += [S [st .get_opt_voxels_idx (struct ), :] @ x + H [st .get_opt_voxels_idx (struct ), :] @ Wx >= dose_gy - dU ]
106+ elif obj_funcs [i ]['type' ] == 'quadratic' :
107+ if obj_funcs [i ]['structure_name' ] in my_plan .structures .get_structures ():
108+ struct = obj_funcs [i ]['structure_name' ]
109+ if len (st .get_opt_voxels_idx (struct )) == 0 :
110+ continue
111+ obj += [(1 / len (st .get_opt_voxels_idx (struct ))) * (
112+ obj_funcs [i ]['weight' ] * cp .sum_squares (S [st .get_opt_voxels_idx (struct ), :] @ x + H [st .get_opt_voxels_idx (struct ), :] @ Wx ))]
113+ elif obj_funcs [i ]['type' ] == 'smoothness-quadratic' :
114+ [Qx , Qy , num_rows , num_cols ] = self .get_smoothness_matrix (inf_matrix .beamlets_dict )
115+ smoothness_X_weight = 0.6
116+ smoothness_Y_weight = 0.4
117+ obj += [obj_funcs [i ]['weight' ] * (smoothness_X_weight * (1 / num_cols ) * cp .sum_squares (Qx @ x ) +
118+ smoothness_Y_weight * (1 / num_rows ) * cp .sum_squares (Qy @ x ))]
119+
120+ print ('Objective done' )
121+
122+ print ('Constraints Start' )
123+
124+ constraint_def = deepcopy (
125+ clinical_criteria .get_criteria ()) # get all constraints definition using clinical criteria
126+
127+ # add/modify constraints definition if present in opt params
128+ for opt_constraint in opt_params_constraints :
129+ # add constraint
130+ param = opt_constraint ['parameters' ]
131+ if param ['structure_name' ] in my_plan .structures .get_structures ():
132+ criterion_exist , criterion_ind = clinical_criteria .check_criterion_exists (opt_constraint ,
133+ return_ind = True )
134+ if criterion_exist :
135+ constraint_def [criterion_ind ] = opt_constraint
136+ else :
137+ constraint_def += [opt_constraint ]
138+
139+ # Adding max/mean constraints
140+ for i in range (len (constraint_def )):
141+ if constraint_def [i ]['type' ] == 'max_dose' :
142+ limit_key = self .matching_keys (constraint_def [i ]['constraints' ], 'limit' )
143+ if limit_key :
144+ limit = self .dose_to_gy (limit_key , constraint_def [i ]['constraints' ][limit_key ])
145+ org = constraint_def [i ]['parameters' ]['structure_name' ]
146+ if org != 'GTV' and org != 'CTV' :
147+ if org in my_plan .structures .get_structures ():
148+ if len (st .get_opt_voxels_idx (org )) == 0 :
149+ continue
150+ constraints += [S [st .get_opt_voxels_idx (org ), :] @ x + H [st .get_opt_voxels_idx (org ), :] @ Wx <= limit / num_fractions ]
151+ elif constraint_def [i ]['type' ] == 'mean_dose' :
152+ limit_key = self .matching_keys (constraint_def [i ]['constraints' ], 'limit' )
153+ if limit_key :
154+ limit = self .dose_to_gy (limit_key , constraint_def [i ]['constraints' ][limit_key ])
155+ org = constraint_def [i ]['parameters' ]['structure_name' ]
156+ # mean constraints using voxel weights
157+ if org in my_plan .structures .get_structures ():
158+ if len (st .get_opt_voxels_idx (org )) == 0 :
159+ continue
160+ fraction_of_vol_in_calc_box = my_plan .structures .get_fraction_of_vol_in_calc_box (org )
161+ limit = limit / fraction_of_vol_in_calc_box # modify limit due to fraction of volume receiving no dose
162+ constraints += [(1 / sum (st .get_opt_voxels_volume_cc (org ))) *
163+ (cp .sum ((cp .multiply (st .get_opt_voxels_volume_cc (org ),
164+ S [st .get_opt_voxels_idx (org ), :] @ x + H [st .get_opt_voxels_idx (org ), :] @ Wx ))))
165+ <= limit / num_fractions ]
166+
167+ constraints += [Wx == (W @ x )]
168+ print ('Constraints done' )
169+
170+ def get_sparse_plus_low_rank (self , A = None , thresold_perc = 1 , rank = 5 ):
171+ """
172+ :param A: dose influence matrix
173+ :param thresold_perc: thresold percentage. Default to 1% of max(A)
174+ :type rank: rank of L = A-S.
175+ :returns: S, H, W using randomized svd
176+ """
177+ if A is None :
178+ A = deepcopy (self .inf_matrix .A )
179+ tol = np .max (A ) * thresold_perc * 0.01
180+ # S = S*0
181+ S = np .where (A > tol , A , 0 )
182+ if rank == 0 :
183+ H = np .zeros ((A .shape [0 ], 1 ))
184+ W = np .zeros ((1 , A .shape [1 ]))
185+ else :
186+ print ('Running svd..' )
187+ [U , svd_S , V ] = randomized_svd (A - S , n_components = rank + 1 , random_state = 0 )
188+ print ('svd done!' )
189+ H = U [:, :rank ]
190+ W = np .diag (svd_S [:rank ]) @ V [:rank , :]
191+ S = scipy .sparse .csr_matrix (S )
192+ return S , H , W
193+
194+ def get_low_dim_basis (self , inf_matrix : InfluenceMatrix = None , compression : str = 'wavelet' ):
195+ """
196+ :param inf_matrix: an object of class InfluenceMatrix for the specified plan
197+ :param compression: the compression method
198+ :type compression: str
199+ :return: a list that contains the dimension reduction basis in the format of array(float)
200+ """
201+ if inf_matrix is None :
202+ inf_matrix = self .inf_matrix
203+ low_dim_basis = {}
204+ num_of_beams = len (inf_matrix .beamlets_dict )
205+ num_of_beamlets = inf_matrix .beamlets_dict [num_of_beams - 1 ]['end_beamlet_idx' ] + 1
206+ beam_id = [inf_matrix .beamlets_dict [i ]['beam_id' ] for i in range (num_of_beams )]
207+ beamlets = inf_matrix .get_bev_2d_grid (beam_id = beam_id )
208+ index_position = list ()
209+ for ind in range (num_of_beams ):
210+ low_dim_basis [beam_id [ind ]] = []
211+ for i in range (inf_matrix .beamlets_dict [ind ]['start_beamlet_idx' ],
212+ inf_matrix .beamlets_dict [ind ]['end_beamlet_idx' ] + 1 ):
213+ index_position .append ((np .where (beamlets [ind ] == i )[0 ][0 ], np .where (beamlets [ind ] == i )[1 ][0 ]))
214+ if compression == 'wavelet' :
215+ max_dim_0 = np .max ([beamlets [ind ].shape [0 ] for ind in range (num_of_beams )])
216+ max_dim_1 = np .max ([beamlets [ind ].shape [1 ] for ind in range (num_of_beams )])
217+ beamlet_2d_grid = np .zeros ((int (np .ceil (max_dim_0 / 2 )), int (np .ceil (max_dim_1 / 2 ))))
218+ for row in range (beamlet_2d_grid .shape [0 ]):
219+ for col in range (beamlet_2d_grid .shape [1 ]):
220+ beamlet_2d_grid [row ][col ] = 1
221+ approximation_coeffs = pywt .idwt2 ((beamlet_2d_grid , (None , None , None )), 'sym4' ,
222+ mode = 'periodization' )
223+ horizontal_coeffs = pywt .idwt2 ((None , (beamlet_2d_grid , None , None )), 'sym4' , mode = 'periodization' )
224+ for b in range (num_of_beams ):
225+ if ((2 * row + 1 < beamlets [b ].shape [0 ] and 2 * col + 1 < beamlets [b ].shape [1 ] and
226+ beamlets [b ][2 * row + 1 ][2 * col + 1 ] != - 1 ) or
227+ (2 * row + 1 < beamlets [b ].shape [0 ] and 2 * col < beamlets [b ].shape [1 ] and
228+ beamlets [b ][2 * row + 1 ][2 * col ] != - 1 ) or
229+ (2 * row < beamlets [b ].shape [0 ] and 2 * col + 1 < beamlets [b ].shape [1 ] and
230+ beamlets [b ][2 * row ][2 * col + 1 ] != - 1 ) or
231+ (2 * row < beamlets [b ].shape [0 ] and 2 * col < beamlets [b ].shape [1 ] and
232+ beamlets [b ][2 * row ][2 * col ] != - 1 )):
233+ approximation = np .zeros (num_of_beamlets )
234+ horizontal = np .zeros (num_of_beamlets )
235+ for ind in range (inf_matrix .beamlets_dict [b ]['start_beamlet_idx' ],
236+ inf_matrix .beamlets_dict [b ]['end_beamlet_idx' ] + 1 ):
237+ approximation [ind ] = approximation_coeffs [index_position [ind ]]
238+ horizontal [ind ] = horizontal_coeffs [index_position [ind ]]
239+ low_dim_basis [beam_id [b ]].append (np .transpose (np .stack ([approximation , horizontal ])))
240+ beamlet_2d_grid [row ][col ] = 0
241+ for b in beam_id :
242+ low_dim_basis [b ] = np .concatenate (low_dim_basis [b ], axis = 1 )
243+ u , s , vh = scipy .sparse .linalg .svds (low_dim_basis [b ], k = min (low_dim_basis [b ].shape [0 ], low_dim_basis [b ].shape [1 ]) - 1 )
244+ ind = np .where (s > 0.0001 )
245+ low_dim_basis [b ] = u [:, ind [0 ]]
246+ return np .concatenate ([low_dim_basis [b ] for b in beam_id ], axis = 1 )
0 commit comments