Commits
Benjamin Bean authored 4d704b8cd9d
34 34 | itersDone = field0[chan][stoke]['iterDone'][ncycles/2] |
35 35 | |
36 36 | 3. To get the available minor cycle summary statistics: |
37 37 | field0 = summ_min[0] |
38 38 | chan0 = field0.keys()[0] |
39 39 | stoke0 = field0[chan0].keys()[0] |
40 40 | availSummStats = field0[field0][stoke0].keys() |
41 41 | """ |
42 42 | # 0 1 2 3 4 5 6 7 8 9 10 11 "No Mask" 12 13 14 15 16 17 18 |
43 43 | rowDescriptionsOldOrder = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan", "stoke", "cycleStartIters", "startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM", "peakResNM", "masksum", "mpiServer", "peakMem", "runtime", "multifieldId", "stopCode"] |
44 + | rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "deconId", "chan"] |
44 45 | # rowDescriptions does not include {"multifieldId", "chan", "stoke", "deconId"}, and so the returned dictionary will not include those values in the summary keys |
45 46 | rowDescriptions = ["startIterDone", "iterDone", "startPeakRes", "peakRes", "startModelFlux", "modelFlux", "startPeakResNM", "peakResNM", "cycleThresh", "cycleStartIters", "masksum", "mpiServer", "peakMem", "runtime", "stopCode"] |
46 47 | rowStartDescs = ["startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM"] |
47 48 | |
48 49 | def convertMatrix(summaryminor_matrix, calc_iterdone_deltas=None, keep_startvals=None): |
49 50 | # casalog.post(summaryminor_matrix, "SEVERE") |
50 51 | ret = {} |
51 52 | |
52 53 | # edge case: no iterations were done (eg threshold < model flux) |
53 54 | if summaryminor_matrix.shape[1] == 0: |
110 111 | continue |
111 112 | for rowIn in range(nrowsIn): |
112 113 | rowOut = rowIn |
113 114 | matrixOut[rowOut,colOut] = matrixIn[rowIn,colIn] |
114 115 | maxRowOut = max(rowOut, maxRowOut) |
115 116 | maxColOut = colOut |
116 117 | colOut += 1 |
117 118 | |
118 119 | return matrixOut |
119 120 | |
120 - | def getRowDescriptionsOldOrder(): |
121 + | def useSmallSummaryminor(ignored_parameter=None): |
122 + | """Temporary CAS-13683 workaround""" |
123 + | if ('USE_SMALL_SUMMARYMINOR' in os.environ): |
124 + | uss = os.environ['USE_SMALL_SUMMARYMINOR'].lower() |
125 + | if (uss == "true"): |
126 + | return True |
127 + | return False |
128 + | |
129 + | def _getRowDescriptionsOldOrder(useSmallSummaryminor): |
130 + | """Temporary CAS-13683 workaround""" |
131 + | if (useSmallSummaryminor): |
132 + | return SummaryMinor.rowDescriptions13683 |
121 133 | return SummaryMinor.rowDescriptionsOldOrder |
122 134 | |
135 + | def getRowDescriptionsOldOrder(): |
136 + | """ Retrieves brief descriptions of the available minor cycle summary rows, in the old (matrix) order. """ |
137 + | return SummaryMinor._getRowDescriptionsOldOrder(SummaryMinor.useSmallSummaryminor()) |
138 + | |
139 + | def _getRowDescriptions(useSmallSummaryminor): |
140 + | """Temporary CAS-13683 workaround""" |
141 + | ret = SummaryMinor.rowDescriptions |
142 + | availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) |
143 + | ret = list(filter(lambda x: x in availRows, ret)) |
144 + | return ret |
145 + | |
123 146 | def getRowDescriptions(): |
124 - | return SummaryMinor.rowDescriptions |
147 + | """ Retrieves brief descriptions of the available minor cycle summary rows """ |
148 + | return SummaryMinor._getRowDescriptions(SummaryMinor.useSmallSummaryminor()) |
149 + | |
150 + | def _getRowStartDescs(useSmallSummaryminor): |
151 + | """Temporary CAS-13683 workaround""" |
152 + | ret = SummaryMinor.rowStartDescs |
153 + | availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) |
154 + | ret = list(filter(lambda x: x in availRows, ret)) |
155 + | return ret |
125 156 | |
126 157 | def getRowStartDescs(): |
127 - | return SummaryMinor.rowStartDescs |
158 + | """ Retrieves abreviated names of the available minor cycle summary "start" rows. |
159 + | |
160 + | These are the rows that catalog the values at the beggining of a minor cycle (pre-deconvolution). """ |
161 + | return SummaryMinor._getRowStartDescs(SummaryMinor.useSmallSummaryminor()) |
128 162 | |
129 163 | def indexMinorCycleSummaryBySubimage(matrix): |
130 164 | """ Re-indexes matrix from [row,column] to [channel,stokes,row,cycle]. |
131 165 | |
132 166 | Param matrix: the original matrix to convert. |
133 167 | """ |
134 168 | # get some properties of the summary_minor matrix |
135 169 | nrows = matrix.shape[0] |
136 170 | ncols = matrix.shape[1] |
171 + | uss = SummaryMinor.useSmallSummaryminor() # Temporary CAS-13683 workaround |
137 172 | import sys |
138 173 | oldChanIdx = SummaryMinor.getRowDescriptionsOldOrder().index("chan") |
139 - | oldStokeIdx = SummaryMinor.getRowDescriptionsOldOrder().index("stoke") |
174 + | if not uss: |
175 + | oldStokeIdx = SummaryMinor.getRowDescriptionsOldOrder().index("stoke") |
140 176 | chans = list(np.sort(np.unique(matrix[oldChanIdx]))) |
141 177 | chans = [int(x) for x in chans] |
142 - | stokes = list(np.sort(np.unique(matrix[oldStokeIdx]))) |
143 - | stokes = [int(x) for x in stokes] |
178 + | if uss: |
179 + | stokes = [0] |
180 + | else: |
181 + | stokes = list(np.sort(np.unique(matrix[oldStokeIdx]))) |
182 + | stokes = [int(x) for x in stokes] |
144 183 | ncycles = 0 |
145 184 | if len(chans) > 0 and len(stokes) > 0: |
146 185 | ncycles = int( ncols / (len(chans)*len(stokes)) ) |
186 + | if uss: |
187 + | try: |
188 + | from casampi.MPIEnvironment import MPIEnvironment |
189 + | if MPIEnvironment.is_mpi_enabled: |
190 + | # This is necessary because we may have an odd number of "channels" due to each process getting only a subchunk. |
191 + | # Example: |
192 + | # Process 1 gets stokes 0-1, process 2 gets stokes 2 |
193 + | # Each of them assigns channel id = chan + stoke * nsubstokes |
194 + | # Process 1 assigns channel ids [0,2], Process 2 assigns channel id 0. |
195 + | # This hack is not needed when not using a small summary minor because we have the extra knowledge of the stokes, instead of mapping stokes + channels onto chunks. |
196 + | chanslist = matrix[oldChanIdx].tolist() |
197 + | for chan in chans: |
198 + | singlechan_occurances = list(filter(lambda x: x == chan, chanslist)) |
199 + | ncycles = max(ncycles, len(singlechan_occurances)) |
200 + | except ModuleNotFoundError as e: |
201 + | raise |
147 202 | |
148 203 | # ret is the return dictionary[chans][stokes][rows][cycles] |
149 204 | # cummulativeCnt counts how many cols we've read for each channel/stokes/row |
150 205 | ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions()} |
151 206 | ret = {stoke:copy.deepcopy(ret) for stoke in stokes} |
152 207 | ret = {chan:copy.deepcopy(ret) for chan in chans} |
153 208 | cummulativeCnt = copy.deepcopy(ret) # copy ret's structure |
154 209 | |
155 210 | # reindex based on subimage index (aka chan/stoke index) |
156 211 | for desc in SummaryMinor.getRowDescriptions(): |
157 212 | oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder().index(desc) |
158 213 | for colIdx in range(ncols): |
159 214 | chan = int(matrix[oldChanIdx][colIdx]) |
160 - | stoke = int(matrix[oldStokeIdx][colIdx]) |
215 + | if uss: |
216 + | stoke = 0 |
217 + | else: |
218 + | stoke = int(matrix[oldStokeIdx][colIdx]) |
161 219 | val = matrix[oldRowIdx][colIdx] |
162 220 | cummulativeCol = int(cummulativeCnt[chan][stoke][desc][0]) # const 0: cummulativeCnt doesn't make use of 'cycle' index from copied ret structure |
163 221 | ret[chan][stoke][desc][cummulativeCol] = val |
164 222 | cummulativeCnt[chan][stoke][desc][0] += 1 |
165 223 | |
166 224 | return ret |
167 225 | |
168 226 | def _getPerCycleDict(summaryminor_dict, calc_iterdone_deltas=None, keep_startvals=None): |
169 227 | calc_iterdone_deltas = True if (calc_iterdone_deltas == None) else calc_iterdone_deltas |
170 228 | keep_startvals = True if (keep_startvals == None) else keep_startvals |