Source
xxxxxxxxxx
904
904
tblocal.putinfo(readmerec)
905
905
tblocal.close()
906
906
907
907
# Check if the axis was correctly added
908
908
check_axis = axisType(mmsname)
909
909
910
910
if check_axis != axis:
911
911
return False
912
912
913
913
return True
914
-
915
-
914
+
915
+
def buildScanDDIMap(scanSummary, ddIspectralWindowInfo):
916
+
"""
917
+
Builds a scan->DDI map and 3 list of # visibilities per DDI, scan, field
918
+
919
+
:param scanSummary: scan summary dictionary as produced by the mstool (getscansummary)
920
+
:param ddiSpectralWindowInfo: SPW info dictionary as produced by the mstool
921
+
(getspectralwindowinfo())
922
+
:returns: a dict with a scan->ddi map, and three dict with # of visibilities per
923
+
ddi, scan, and field.
924
+
"""
925
+
# Make an array for total number of visibilites per ddi and scan separatelly
926
+
nVisPerDDI = {}
927
+
nVisPerScan = {}
928
+
nVisPerField = {}
929
+
930
+
# Iterate over scan list
931
+
scanDdiMap = {}
932
+
for scan in sorted(scanSummary):
933
+
# Initialize scan sub-map
934
+
scanDdiMap[scan] = {}
935
+
# Iterate over timestamps for this scan
936
+
for timestamp in scanSummary[scan]:
937
+
# Get list of ddis for this timestamp
938
+
DDIds = scanSummary[scan][timestamp]['DDIds']
939
+
fieldId = str(scanSummary[scan][timestamp]['FieldId'])
940
+
# Get number of rows per ddi (assume all DDIs have the same number of rows)
941
+
# In ALMA data WVR DDI has only one row per antenna but it is separated from the other DDIs
942
+
nrowsPerDDI = scanSummary[scan][timestamp]['nRow'] / len(DDIds)
943
+
))
944
+
# Iterate over DDIs for this timestamp
945
+
for ddi in DDIds:
946
+
# Convert to string to be used as a map key
947
+
ddi = str(ddi)
948
+
# Check if DDI entry is already present for this scan, otherwise initialize it
949
+
if not scanDdiMap[scan].has_key(ddi):
950
+
scanDdiMap[scan][ddi] = {}
951
+
scanDdiMap[scan][ddi]['nVis'] = 0
952
+
scanDdiMap[scan][ddi]['fieldId'] = fieldId
953
+
scanDdiMap[scan][ddi]['isWVR'] = ddIspectralWindowInfo[ddi]['isWVR']
954
+
# Calculate number of visibilities
955
+
nvis = nrowsPerDDI*ddIspectralWindowInfo[ddi]['NumChan']*ddIspectralWindowInfo[ddi]['NumCorr']
956
+
# Add number of rows and vis from this timestamp
957
+
scanDdiMap[scan][ddi]['nVis'] = scanDdiMap[scan][ddi]['nVis'] + nvis
958
+
# Update ddi nvis
959
+
if not nVisPerDDI.has_key(ddi):
960
+
nVisPerDDI[ddi] = nvis
961
+
else:
962
+
nVisPerDDI[ddi] = nVisPerDDI[ddi] + nvis
963
+
# Update scan nvis
964
+
if not nVisPerScan.has_key(scan):
965
+
nVisPerScan[scan] = nvis
966
+
else:
967
+
nVisPerScan[scan] = nVisPerScan[scan] + nvis
968
+
# Update field nvis
969
+
if not nVisPerField.has_key(fieldId):
970
+
nVisPerField[fieldId] = nvis
971
+
else:
972
+
nVisPerField[fieldId] = nVisPerField[fieldId] + nvis
973
+
974
+
return scanDdiMap, nVisPerDDI, nVisPerScan, nVisPerField
975
+
916
976
def getPartitionMap(msfilename, nsubms, selection={}, axis=['field','spw','scan'],plotMode=0):
917
977
"""Generates a partition scan/spw map to obtain optimal load balancing with the following criteria:
918
978
919
979
1st - Maximize the scan/spw/field distribution across sub-MSs
920
980
2nd - Generate sub-MSs with similar size
921
981
922
982
In order to balance better the size of the subMSs the allocation process
923
983
iterates over the scan,spw pairs in descending number of visibilities.
924
984
925
985
That is larger chunks are allocated first, and smaller chunks at the final
955
1015
956
1016
# Close ms tool
957
1017
myMsTool.close()
958
1018
959
1019
960
1020
# Get list of WVR SPWs using the ms metadata tool
961
1021
myMsMetaDataTool = msmdtool()
962
1022
myMsMetaDataTool.open(msfilename)
963
1023
wvrspws = myMsMetaDataTool.wvrspws()
964
1024
myMsMetaDataTool.close()
965
-
966
-
1025
+
967
1026
# Mark WVR DDIs as identified by the ms metadata tool
968
1027
for ddi in ddIspectralWindowInfo:
969
1028
if ddIspectralWindowInfo[ddi] in wvrspws:
970
1029
ddIspectralWindowInfo[ddi]['isWVR'] = True
971
1030
else:
972
1031
ddIspectralWindowInfo[ddi]['isWVR'] = False
973
1032
974
-
# Make an array for total number of visibilites per ddi and scan separatelly
975
-
nVisPerDDI = {}
976
-
nVisPerScan = {}
977
-
nVisPerField = {}
978
-
979
-
980
-
# Iterate over scan list
981
-
scanDdiMap = {}
982
-
for scan in scanSummary:
983
-
# Initialize scan sub-map
984
-
scanDdiMap[scan] = {}
985
-
# Iterate over timestamps for this scan
986
-
for timestamp in scanSummary[scan]:
987
-
# Get list of ddis for this timestamp
988
-
DDIds = scanSummary[scan][timestamp]['DDIds']
989
-
fieldId = str(scanSummary[scan][timestamp]['FieldId'])
990
-
# Get number of rows per ddi (assume all DDIs have the same number of rows)
991
-
# In ALMA data WVR DDI has only one row per antenna but it is separated from the other DDIs
992
-
nrowsPerDDI = scanSummary[scan][timestamp]['nRow'] / len(DDIds)
993
-
# Iterate over DDIs for this timestamp
994
-
for ddi in DDIds:
995
-
# Convert to string to be used as a map key
996
-
ddi = str(ddi)
997
-
# Check if DDI entry is already present for this scan, otherwise initialize it
998
-
if not scanDdiMap[scan].has_key(ddi):
999
-
scanDdiMap[scan][ddi] = {}
1000
-
scanDdiMap[scan][ddi]['nVis'] = 0
1001
-
scanDdiMap[scan][ddi]['fieldId'] = fieldId
1002
-
scanDdiMap[scan][ddi]['isWVR'] = ddIspectralWindowInfo[ddi]['isWVR']
1003
-
# Calculate number of visibilities
1004
-
nvis = nrowsPerDDI*ddIspectralWindowInfo[ddi]['NumChan']*ddIspectralWindowInfo[ddi]['NumCorr']
1005
-
# Add number of rows and vis from this timestamp
1006
-
scanDdiMap[scan][ddi]['nVis'] = scanDdiMap[scan][ddi]['nVis'] + nvis
1007
-
# Update ddi nvis
1008
-
if not nVisPerDDI.has_key(ddi):
1009
-
nVisPerDDI[ddi] = nvis
1010
-
else:
1011
-
nVisPerDDI[ddi] = nVisPerDDI[ddi] + nvis
1012
-
# Update scan nvis
1013
-
if not nVisPerScan.has_key(scan):
1014
-
nVisPerScan[scan] = nvis
1015
-
else:
1016
-
nVisPerScan[scan] = nVisPerScan[scan] + nvis
1017
-
# Update field nvis
1018
-
if not nVisPerField.has_key(fieldId):
1019
-
nVisPerField[fieldId] = nvis
1020
-
else:
1021
-
nVisPerField[fieldId] = nVisPerField[fieldId] + nvis
1022
-
1033
+
scanDdiMap, nVisPerDDI, nVisPerScan, nVisPerField = buildScanDDIMap(scanSummary,
1034
+
ddIspectralWindowInfo)
1023
1035
1024
1036
# Sort the scan/ddi pairs depending on the number of visibilities
1025
1037
ddiList = list()
1026
1038
scanList = list()
1027
1039
fieldList = list()
1028
1040
nVisList = list()
1029
1041
nScanDDIPairs = 0
1030
1042
for scan in scanDdiMap:
1031
1043
for ddi in scanDdiMap[scan]:
1032
1044
ddiList.append(ddi)
1038
1050
1039
1051
# Check that the number of available scan/ddi pairs is not greater than the number of subMSs
1040
1052
if nsubms > nScanDDIPairs:
1041
1053
casalog.post("Number of subMSs (%i) is greater than available scan,ddi pairs (%i), setting nsubms to %i"
1042
1054
% (nsubms,nScanDDIPairs,nScanDDIPairs),"WARN","getPartitionMap")
1043
1055
nsubms = nScanDDIPairs
1044
1056
1045
1057
ddiArray = np.array(ddiList)
1046
1058
scanArray = np.array(scanList)
1047
1059
nVisArray = np.array(nVisList)
1048
-
1060
+
1049
1061
nVisSortIndex = np.argsort(nVisArray)
1050
1062
nVisSortIndex[:] = nVisSortIndex[::-1]
1051
1063
1052
1064
ddiArray = ddiArray[nVisSortIndex]
1053
1065
scanArray = scanArray[nVisSortIndex]
1054
1066
nVisArray = nVisArray[nVisSortIndex]
1055
-
1056
1067
1057
1068
# Make a map for the contribution of each subMS to each scan
1058
1069
scanNvisDistributionPerSubMs = {}
1059
1070
for scan in scanSummary:
1060
1071
scanNvisDistributionPerSubMs[scan] = np.zeros(nsubms)
1061
1072
1062
1073
1063
1074
# Make a map for the contribution of each subMS to each ddi
1064
1075
ddiNvisDistributionPerSubMs = {}
1065
1076
for ddi in ddIspectralWindowInfo: