Commits
Benjamin Bean authored 4383525e3a3
43 43 | getAll(summary_key): returns the list of all values for the given summary key |
44 44 | getDict(calc_iterdone_deltas, keep_startvals): to get the iterDone stat for iterations across all channels |
45 45 | getOutlierField(fieldid): to get a new SummaryMinor instance the contents of an outlier field other than field 0 |
46 46 | |
47 47 | Extends the python dictionary interface (try the command "help(dict)" for more information on builtin python dicts). |
48 48 | |
49 49 | Note that this class will be saved as a plain python dict when saved with methods like pickle.dump() or numpy.save(). |
50 50 | This is done to prevent issues when loading later, when this class might not be available to the python interpretter.""" |
51 51 | # 0 1 2 3 4 5 6 7 8 9 10 11 "No Mask" 12 13 14 15 16 17 18 |
52 52 | rowDescriptionsOldOrder = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "mapperId", "chan", "pol", "cycleStartIters", "startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM", "peakResNM", "masksum", "mpiServer", "peakMem", "runtime", "immod", "stopCode"] |
53 + | rowDescriptions13683 = ["iterDone", "peakRes", "modelFlux", "cycleThresh", "immod", "chan"] |
53 54 | rowDescriptions = ["startIterDone", "iterDone", "startPeakRes", "peakRes", "startModelFlux", "modelFlux", "startPeakResNM", "peakResNM", "cycleThresh", "mapperId", "cycleStartIters", "masksum", "mpiServer", "peakMem", "runtime", "immod", "stopCode", "chan", "pol"] |
54 55 | rowStartDescs = ["startIterDone", "startPeakRes", "startModelFlux", "startPeakResNM"] |
55 56 | |
56 57 | def __init__(self, summaryminor_matrix, summaryminor_dict = None): |
57 58 | self.summaryminor_matrix = summaryminor_matrix |
58 59 | self.singleFieldMatrix = summaryminor_matrix |
59 60 | self.fieldIds = SummaryMinor._getFieldIds(summaryminor_matrix) |
60 61 | if len(self.fieldIds) > 1: |
61 62 | self.singleFieldMatrix = SummaryMinor._getSingleFieldMatrix(summaryminor_matrix, self.fieldIds[0]) |
62 63 | if (len(self.fieldIds) > 1): |
118 119 | matrixOut[rowOut,colOut] = matrixIn[rowIn,colIn] |
119 120 | maxRowOut = max(rowOut, maxRowOut) |
120 121 | maxColOut = colOut |
121 122 | colOut += 1 |
122 123 | |
123 124 | return matrixOut |
124 125 | |
125 126 | def getMatrix(self, fieldId=-1): |
126 127 | """Returns the original numpy.ndarray matrix. |
127 128 | fieldId: None for the original matrix, -1 for the matrix used to populate this dictionary, or choose another outlier fields |
128 - | Index 0: row (see this.rowDescriptionsOldOrder) |
129 + | Index 0: row (see this.rowDescriptionsOldOrder/rowDescriptions13683) |
129 130 | Index 1: values for all the minor cycles and outlier fields""" |
130 131 | if (fieldId == None): |
131 132 | return self.summaryminor_matrix |
132 133 | if (fieldId == -1): |
133 134 | return self.singleFieldMatrix |
134 135 | return SummaryMinor._getSingleFieldMatrix(self.summaryminor_matrix, fieldId) |
135 136 | |
136 137 | def getAll(self, summary_key): |
137 138 | """Return the numpy matrix of all values for the given key (eg "chan")""" |
138 139 | idx = self.getRowDescriptionsOldOrder().index(summary_key) |
139 140 | return self.getMatrix()[idx,:] |
140 141 | |
141 - | def getRowDescriptionsOldOrder(): |
142 + | def useSmallSummaryminor(ignored_parameter=None): |
143 + | """Temporary CAS-13683 workaround""" |
144 + | if ('USE_SMALL_SUMMARYMINOR' in os.environ): |
145 + | uss = os.environ['USE_SMALL_SUMMARYMINOR'].lower() |
146 + | if (uss == "true"): |
147 + | return True |
148 + | return False |
149 + | |
150 + | def _getRowDescriptionsOldOrder(useSmallSummaryminor): |
151 + | """Temporary CAS-13683 workaround""" |
152 + | if (useSmallSummaryminor): |
153 + | return SummaryMinor.rowDescriptions13683 |
142 154 | return SummaryMinor.rowDescriptionsOldOrder |
143 155 | |
156 + | def getRowDescriptionsOldOrder(): |
157 + | return SummaryMinor._getRowDescriptionsOldOrder(SummaryMinor.useSmallSummaryminor()) |
158 + | |
159 + | def _getRowDescriptions(useSmallSummaryminor): |
160 + | """Temporary CAS-13683 workaround""" |
161 + | ret = SummaryMinor.rowDescriptions |
162 + | availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) |
163 + | ret = list(filter(lambda x: x in availRows, ret)) |
164 + | return ret |
165 + | |
144 166 | def getRowDescriptions(): |
145 - | return SummaryMinor.rowDescriptions |
167 + | return SummaryMinor._getRowDescriptions(SummaryMinor.useSmallSummaryminor()) |
168 + | |
169 + | def _getRowStartDescs(useSmallSummaryminor): |
170 + | """Temporary CAS-13683 workaround""" |
171 + | ret = SummaryMinor.rowStartDescs |
172 + | availRows = SummaryMinor._getRowDescriptionsOldOrder(useSmallSummaryminor) |
173 + | ret = list(filter(lambda x: x in availRows, ret)) |
174 + | return ret |
146 175 | |
147 176 | def getRowStartDescs(): |
148 - | return SummaryMinor.rowStartDescs |
177 + | return SummaryMinor._getRowStartDescs(SummaryMinor.useSmallSummaryminor()) |
149 178 | |
150 179 | def indexMinorCycleSummaryBySubimage(matrix): |
151 180 | """Re-indexes matrix from [row,column] to [channel,polarity,row,cycle].""" |
152 181 | # get some properties of the summary_minor matrix |
153 182 | nrows = matrix.shape[0] |
154 183 | ncols = matrix.shape[1] |
184 + | uss = SummaryMinor.useSmallSummaryminor() # Temporary CAS-13683 workaround |
155 185 | import sys |
156 - | oldChanIdx = SummaryMinor.getRowDescriptionsOldOrder().index("chan") |
157 - | newChanIdx = SummaryMinor.getRowDescriptions().index("chan") |
158 - | oldPolIdx = SummaryMinor.getRowDescriptionsOldOrder().index("pol") |
159 - | newPolIdx = SummaryMinor.getRowDescriptions().index("pol") |
186 + | oldChanIdx = SummaryMinor._getRowDescriptionsOldOrder(uss).index("chan") |
187 + | newChanIdx = SummaryMinor._getRowDescriptions(uss).index("chan") |
188 + | if not uss: |
189 + | oldPolIdx = SummaryMinor._getRowDescriptionsOldOrder(uss).index("pol") |
190 + | newPolIdx = SummaryMinor._getRowDescriptions(uss).index("pol") |
160 191 | chans = list(np.sort(np.unique(matrix[oldChanIdx]))) |
161 192 | chans = [int(x) for x in chans] |
162 - | pols = list(np.sort(np.unique(matrix[oldPolIdx]))) |
163 - | pols = [int(x) for x in pols] |
193 + | if uss: |
194 + | pols = [0] |
195 + | else: |
196 + | pols = list(np.sort(np.unique(matrix[oldPolIdx]))) |
197 + | pols = [int(x) for x in pols] |
164 198 | ncycles = 0 |
165 199 | if len(chans) > 0 and len(pols) > 0: |
166 200 | ncycles = int( ncols / (len(chans)*len(pols)) ) |
201 + | if uss: |
202 + | try: |
203 + | from casampi.MPIEnvironment import MPIEnvironment |
204 + | if MPIEnvironment.is_mpi_enabled: |
205 + | # This is necessary because we may have an odd number of "channels" due to each process getting only a subchunk. |
206 + | # Example: |
207 + | # Process 1 gets polarities 0-1, process 2 gets polarity 2 |
208 + | # Each of them assigns channel id = chan + pol * nsubpols |
209 + | # Process 1 assigns channel ids [0,2], Process 2 assigns channel id 0. |
210 + | # This hack is not needed when not using a small summary minor because we have the extra knowledge of the polarities, instead of mapping polarities + channels onto chunks. |
211 + | chanslist = matrix[oldChanIdx].tolist() |
212 + | for chan in chans: |
213 + | singlechan_occurances = list(filter(lambda x: x == chan, chanslist)) |
214 + | ncycles = max(ncycles, len(singlechan_occurances)) |
215 + | except ModuleNotFoundError as e: |
216 + | raise |
167 217 | |
168 218 | # ret is the return dictionary[chans][pols][rows][cycles] |
169 219 | # cummulativeCnt counts how many cols we've read for each channel/polarity/row |
170 - | ret = {desc:[0]*ncycles for desc in SummaryMinor.getRowDescriptions()} |
220 + | ret = {desc:[0]*ncycles for desc in SummaryMinor._getRowDescriptions(uss)} |
171 221 | # channel and polarity information is in the indexing, don't need to add it to the return dict |
172 222 | for desc in ["chan", "pol"]: |
173 223 | if desc in ret: |
174 224 | del ret[desc] |
175 225 | ret = {pol:copy.deepcopy(ret) for pol in pols} |
176 226 | ret = {chan:copy.deepcopy(ret) for chan in chans} |
177 227 | cummulativeCnt = copy.deepcopy(ret) # copy ret's structure |
178 228 | |
179 229 | # reindex based on subimage index (aka chan/pol index) |
180 - | for desc in SummaryMinor.getRowDescriptions(): |
230 + | for desc in SummaryMinor._getRowDescriptions(uss): |
181 231 | if desc in ["chan", "pol"]: |
182 232 | # channel and polarity information is in the indexing, don't need to add it to the return dict |
183 233 | continue |
184 - | oldRowIdx = SummaryMinor.getRowDescriptionsOldOrder().index(desc) |
234 + | oldRowIdx = SummaryMinor._getRowDescriptionsOldOrder(uss).index(desc) |
185 235 | for colIdx in range(ncols): |
186 236 | chan = int(matrix[oldChanIdx][colIdx]) |
187 - | pol = int(matrix[oldPolIdx][colIdx]) |
237 + | if (uss): |
238 + | pol = 0 |
239 + | else: |
240 + | pol = int(matrix[oldPolIdx][colIdx]) |
188 241 | val = matrix[oldRowIdx][colIdx] |
189 242 | cummulativeCol = int(cummulativeCnt[chan][pol][desc][0]) # const 0: cummulativeCnt doesn't make use of 'cycle' index from copied ret structure |
190 243 | ret[chan][pol][desc][cummulativeCol] = val |
191 244 | cummulativeCnt[chan][pol][desc][0] += 1 |
192 245 | |
193 246 | return ret |
194 247 | |
195 248 | def _getPerCycleDict(summaryminor_dict, calc_iterdone_deltas=None, keep_startvals=None): |
196 249 | calc_iterdone_deltas = True if (calc_iterdone_deltas == None) else calc_iterdone_deltas |
197 250 | keep_startvals = True if (keep_startvals == None) else keep_startvals |
198 251 | ret = summaryminor_dict |
199 - | availRows = SummaryMinor.getRowDescriptionsOldOrder() |
252 + | uss = SummaryMinor.useSmallSummaryminor() # Temporary CAS-13683 workaround |
253 + | availRows = SummaryMinor._getRowDescriptionsOldOrder(uss) |
200 254 | |
201 255 | if (calc_iterdone_deltas) and ("startIterDone" in availRows): |
202 256 | for chan in ret: |
203 257 | for pol in ret[chan]: |
204 258 | for cyc in range(len(ret[chan][pol]["startIterDone"])): |
205 259 | ret[chan][pol]["iterDone"][cyc] -= ret[chan][pol]["startIterDone"][cyc] |
206 260 | if not keep_startvals: |
207 261 | for chan in ret: |
208 262 | for pol in ret[chan]: |
209 - | for desc in SummaryMinor.getRowStartDescs(): |
263 + | for desc in SummaryMinor._getRowStartDescs(uss): |
210 264 | del ret[chan][pol][desc] |
211 265 | |
212 266 | return ret |
213 267 | |
214 268 | def getOutlierField(self, fieldId): |
215 269 | """Get a new SummaryMinor instance for the given fieldId. |
216 270 | |
217 271 | Example: |
218 272 | nchan_tot = 0 |
219 273 | for field_id in summ_min.fieldIds: |