Commits
3 3 | import shutil |
4 4 | import pprint as pp |
5 5 | import traceback |
6 6 | import time |
7 7 | import numpy as np |
8 8 | from matplotlib import pyplot as plt |
9 9 | |
10 10 | try: |
11 11 | # CASA 6 |
12 12 | from casatasks import casalog |
13 - | from casatools.platform import bytes2str |
14 13 | from casatools import table, ms, msmetadata |
14 + | from casatools.platform import bytes2str |
15 15 | |
16 16 | import subprocess |
17 17 | use_old_casa5_commands = False |
18 + | |
19 + | mst_local = ms() |
20 + | tbt_local = table() |
21 + | msmdt_local = msmetadata() |
18 22 | except ImportError: |
19 23 | # CASA 5 |
20 24 | from __main__ import * |
21 25 | from taskinit import * |
22 26 | |
23 27 | import commands |
24 28 | use_old_casa5_commands = True |
25 29 | |
30 + | mst_local = mstool() |
31 + | tbt_local = tbtool() |
32 + | msmdt_local = msmdtool() |
26 33 | |
27 34 | class convertToMMS(): |
28 35 | def __init__(self,\ |
29 36 | inpdir=None, \ |
30 37 | mmsdir=None, \ |
31 38 | axis='auto', \ |
32 39 | numsubms=4, |
33 40 | # createmslink=False, \ |
34 41 | cleanup=False): |
35 42 | |
352 359 | # getSpwIds 'Get the Spw IDs of a scan.' |
353 360 | # getDiskUsage 'eturn the size in bytes of an MS in disk.' |
354 361 | # |
355 362 | # ---------------------------------------------------------------------- |
356 363 | |
357 364 | # def getNumberOf(msfile, item='row'): |
358 365 | # '''Using the msmd tool, it gets the number of |
359 366 | # scan, spw, antenna, baseline, field, state, |
360 367 | # channel, row in a MS or MMS''' |
361 368 | # |
362 - | # md = msmdtool() |
369 + | # md = msmdtool() # or msmd() in CASA 6 |
363 370 | # try: |
364 371 | # md.open(msfile) |
365 372 | # except: |
366 373 | # print('Cannot open the msfile') |
367 374 | # return 0 |
368 375 | # |
369 376 | # if item == 'row': |
370 377 | # numof = md.nrows() |
371 378 | # elif item == 'scan': |
372 379 | # numof = md.nscans() |
410 417 | slist.add(j) |
411 418 | |
412 419 | return list(slist) |
413 420 | |
414 421 | def getScanList(msfile, selection={}): |
415 422 | '''Get the list of scans of an MS or MMS. |
416 423 | msfile --> name of MS or MMS |
417 424 | selection --> dictionary with data selection |
418 425 | |
419 426 | Return a list of the scans in this MS/MMS. ''' |
420 - | |
421 - | msTool=mstool() |
422 - | msTool.open(msfile) |
427 + | |
428 + | mst_local.open(msfile) |
423 429 | if isinstance(selection, dict) and selection != {}: |
424 - | msTool.msselect(items=selection) |
430 + | mst_local.msselect(items=selection) |
425 431 | |
426 - | scand = msTool.getscansummary() |
427 - | msTool.close() |
432 + | scand = mst_local.getscansummary() |
433 + | mst_local.close() |
428 434 | |
429 435 | scanlist = scand.keys() |
430 436 | |
431 437 | return scanlist |
432 438 | |
433 439 | |
434 440 | def getScanNrows(msfile, myscan, selection={}): |
435 441 | '''Get the number of rows of a scan in a MS. It will add the nrows of all sub-scans. |
436 442 | This will not take into account any selection done on the MS. |
437 443 | msfile --> name of the MS or MMS |
442 448 | |
443 449 | To compare with the dictionary returned by listpartition, do the following: |
444 450 | |
445 451 | resdict = listpartition('file.mms', createdict=True) |
446 452 | slist = ph.getMMSScans(thisdict) |
447 453 | for s in slist: |
448 454 | mmsN = ph.getMMSScanNrows(thisdict, s) |
449 455 | msN = ph.getScanNrows('referenceMS', s) |
450 456 | assert (mmsN == msN) |
451 457 | ''' |
452 - | msTool=mstool() |
453 - | msTool.open(msfile) |
458 + | mst_local.open(msfile) |
454 459 | if isinstance(selection, dict) and selection != {}: |
455 - | msTool.msselect(items=selection) |
460 + | mst_local.msselect(items=selection) |
456 461 | |
457 - | scand = msTool.getscansummary() |
458 - | msTool.close() |
462 + | scand = mst_local.getscansummary() |
463 + | mst_local.close() |
459 464 | |
460 465 | Nrows = 0 |
461 466 | if not str(myscan) in scand: |
462 467 | return Nrows |
463 468 | |
464 469 | subscans = scand[str(myscan)] |
465 470 | for ii in subscans.keys(): |
466 471 | Nrows += scand[str(myscan)][ii]['nRow'] |
467 472 | |
468 473 | return Nrows |
491 496 | '''Get the Spw IDs of a scan. |
492 497 | msfile --> name of the MS or MMS |
493 498 | myscan --> scan Id (int) |
494 499 | selection --> dictionary with data selection |
495 500 | |
496 501 | Return a list with the Spw IDs. Note that the returned spw IDs are sorted. |
497 502 | |
498 503 | ''' |
499 504 | import numpy as np |
500 505 | |
501 - | msTool=mstool() |
502 - | msTool.open(msfile) |
506 + | mst_local.open(msfile) |
503 507 | if isinstance(selection, dict) and selection != {}: |
504 - | msTool.msselect(items=selection) |
505 - | |
506 - | scand = msTool.getscansummary() |
507 - | msTool.close() |
508 + | mst_local.msselect(items=selection) |
509 + | |
510 + | scand = mst_local.getscansummary() |
511 + | mst_local.close() |
508 512 | |
509 513 | spwlist = [] |
510 514 | |
511 515 | if not str(myscan) in scand: |
512 516 | return spwlist |
513 517 | |
514 518 | subscans = scand[str(myscan)] |
515 519 | aspws = np.array([],dtype=int) |
516 520 | |
517 521 | for ii in subscans.keys(): |
549 553 | 'spwIds': array([0])}, |
550 554 | 2: {'nchans': array([63]), |
551 555 | 'nrows': 1890, |
552 556 | 'spwIds': array([0])}}, |
553 557 | 'size': '72M'}} |
554 558 | """ |
555 559 | |
556 560 | if mslist == []: |
557 561 | return {} |
558 562 | |
559 - | mslocal1 = casac.ms() |
560 - | |
561 563 | # Create lists for scan and spw dictionaries of each MS |
562 564 | msscanlist = [] |
563 565 | msspwlist = [] |
564 566 | |
565 567 | # List with sizes in bytes per sub-MS |
566 568 | sizelist = [] |
567 569 | |
568 570 | # Loop through all MSs |
569 571 | for subms in mslist: |
570 572 | try: |
571 - | mslocal1.open(subms) |
572 - | scans = mslocal1.getscansummary() |
573 + | mst_local.open(subms) |
574 + | scans = mst_local.getscansummary() |
573 575 | msscanlist.append(scans) |
574 - | spws = mslocal1.getspectralwindowinfo() |
576 + | spws = mst_local.getspectralwindowinfo() |
575 577 | msspwlist.append(spws) |
576 - | mslocal1.close() |
577 - | except: |
578 - | mslocal1.close() |
579 - | raise Exception('Cannot get scan/spw information from subMS') |
578 + | except Exception as exc: |
579 + | raise Exception('Cannot get scan/spw information from subMS: {0}'.format(exc)) |
580 + | finally: |
581 + | mst_local.close() |
580 582 | |
581 583 | # Get the data volume in bytes per sub-MS |
582 584 | sizelist.append(getDiskUsage(subms)) |
583 585 | |
584 586 | # Get the information to list in output |
585 587 | # Dictionary to return |
586 588 | outdict = {} |
587 589 | |
588 590 | for ims in range(mslist.__len__()): |
589 591 | # Create temp dictionary for each sub-MS |
704 706 | """Return the size in bytes of an MS or MMS in disk. |
705 707 | |
706 708 | Keyword arguments: |
707 709 | msfile --> name of the MS |
708 710 | This function will return a value given by the command du -hs |
709 711 | """ |
710 712 | |
711 713 | from subprocess import Popen, PIPE, STDOUT |
712 714 | |
713 715 | # Command line to run |
714 - | ducmd = 'du -hs '+msfile |
716 + | ducmd = 'du -hs {0}'.format(msfile) |
715 717 | |
716 718 | if use_old_casa5_commands: |
717 719 | p = Popen(ducmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) |
718 720 | sizeline = p.stdout.read() |
719 721 | _out, _err = p.communicate() |
720 722 | else: |
721 723 | p = Popen(ducmd, shell=True, stdin=None, stdout=PIPE, stderr=STDOUT, close_fds=True) |
722 724 | o, e = p.communicate() ### previously 'sizeline = p.stdout.read()' here |
723 725 | ### left process running... |
724 726 | sizeline = bytes2str(o.split( )[0]) |
725 727 | |
726 728 | # Create a list of the output string, which looks like this: |
727 729 | # ' 75M\tuidScan23.data/uidScan23.0000.ms\n' |
728 730 | # This will create a list with [size,sub-ms] |
729 731 | mssize = sizeline.split() |
730 732 | |
731 733 | return mssize[0] |
732 734 | |
733 735 | |
734 736 | def getSubtables(vis): |
735 - | tbTool = tbtool() |
736 737 | theSubTables = [] |
737 - | tbTool.open(vis) |
738 - | myKeyw = tbTool.getkeywords() |
739 - | tbTool.close() |
738 + | tbt_local.open(vis) |
739 + | myKeyw = tbt_local.getkeywords() |
740 + | tbt_local.close() |
740 741 | for k in myKeyw.keys(): |
741 742 | theKeyw = myKeyw[k] |
742 743 | if (type(theKeyw)==str and theKeyw.split(' ')[0]=='Table:' |
743 744 | and not k=='SORTED_TABLE'): |
744 745 | theSubTables.append(os.path.basename(theKeyw.split(' ')[1])) |
745 746 | |
746 747 | return theSubTables |
747 748 | |
748 749 | |
749 750 | def makeMMS(outputvis, submslist, copysubtables=False, omitsubtables=[], parallelaxis=''): |
762 763 | """ |
763 764 | |
764 765 | if os.path.exists(outputvis): |
765 766 | raise ValueError('Output MS already exists') |
766 767 | |
767 768 | if len(submslist)==0: |
768 769 | raise ValueError('No SubMSs given') |
769 770 | |
770 771 | ## make an MMS with all sub-MSs contained in a SUBMSS subdirectory |
771 772 | origpath = os.getcwd() |
772 - | mymstool = mstool() |
773 - | mytbtool = tbtool() |
774 773 | |
775 774 | try: |
776 775 | try: |
777 - | mymstool.createmultims(outputvis, |
778 - | submslist, |
779 - | [], |
780 - | True, # nomodify |
781 - | False, # lock |
782 - | copysubtables, |
783 - | omitsubtables |
784 - | ) # when copying the subtables, omit these |
785 - | except: |
786 - | mymstool.close() |
776 + | mst_local.createmultims(outputvis, |
777 + | submslist, |
778 + | [], |
779 + | True, # nomodify |
780 + | False, # lock |
781 + | copysubtables, |
782 + | omitsubtables |
783 + | ) # when copying the subtables, omit these |
784 + | |
785 + | except Exception: |
787 786 | raise |
788 - | mymstool.close() |
787 + | finally: |
788 + | mst_local.close() |
789 789 | |
790 790 | # remove the SORTED_TABLE keywords because the sorting is not reliable after partitioning |
791 791 | try: |
792 - | mytbtool.open(outputvis, nomodify=False) |
793 - | if 'SORTED_TABLE' in mytbtool.keywordnames(): |
794 - | mytbtool.removekeyword('SORTED_TABLE') |
795 - | mytbtool.close() |
792 + | tbt_local.open(outputvis, nomodify=False) |
793 + | if 'SORTED_TABLE' in tbt_local.keywordnames(): |
794 + | tbt_local.removekeyword('SORTED_TABLE') |
795 + | tbt_local.close() |
796 + | |
796 797 | for thesubms in submslist: |
797 - | mytbtool.open(outputvis+'/SUBMSS/'+os.path.basename(thesubms), nomodify=False) |
798 - | if 'SORTED_TABLE' in mytbtool.keywordnames(): |
799 - | tobedel = mytbtool.getkeyword('SORTED_TABLE').split(' ')[1] |
800 - | mytbtool.removekeyword('SORTED_TABLE') |
798 + | tbt_local.open(outputvis+'/SUBMSS/'+os.path.basename(thesubms), nomodify=False) |
799 + | if 'SORTED_TABLE' in tbt_local.keywordnames(): |
800 + | tobedel = tbt_local.getkeyword('SORTED_TABLE').split(' ')[1] |
801 + | tbt_local.removekeyword('SORTED_TABLE') |
801 802 | os.system('rm -rf '+tobedel) |
802 - | mytbtool.close() |
803 - | except: |
804 - | mytbtool.close() |
803 + | tbt_local.close() |
804 + | except Exception: |
805 + | tbt_local.close() |
805 806 | raise |
806 807 | |
807 808 | # Create symbolic links to the subtables of the first SubMS in the reference MS (top one) |
808 809 | os.chdir(outputvis) |
809 810 | mastersubms = os.path.basename(submslist[0].rstrip('/')) |
810 811 | thesubtables = getSubtables('SUBMSS/'+mastersubms) |
811 812 | |
812 813 | for s in thesubtables: |
813 814 | os.symlink('SUBMSS/'+mastersubms+'/'+s, s) |
814 815 | |
815 816 | os.chdir('SUBMSS/'+mastersubms) |
816 817 | |
817 818 | # Remove the SOURCE and HISTORY tables, which should not be linked |
818 819 | thesubtables.remove('SOURCE') |
819 820 | thesubtables.remove('HISTORY') |
820 821 | |
821 822 | # Create sym links to all sub-tables in all subMSs |
822 - | for i in xrange(1,len(submslist)): |
823 + | for i in range(1,len(submslist)): |
823 824 | thesubms = os.path.basename(submslist[i].rstrip('/')) |
824 825 | os.chdir('../'+thesubms) |
825 826 | |
826 827 | for s in thesubtables: |
827 828 | os.system('rm -rf '+s) |
828 829 | os.symlink('../'+mastersubms+'/'+s, s) |
829 830 | |
830 831 | # Write the AxisType info in the MMS |
831 832 | if parallelaxis != '': |
832 833 | setAxisType(outputvis, parallelaxis) |
833 834 | |
834 - | except: |
835 - | theproblem = str(sys.exc_info()) |
835 + | except Exception as exc: |
836 836 | os.chdir(origpath) |
837 - | raise ValueError('Problem in MMS creation: {0}'.format(theproblem)) |
837 + | raise ValueError('Problem in MMS creation: {0}'.format(exc)) |
838 838 | |
839 839 | os.chdir(origpath) |
840 840 | |
841 841 | return True |
842 842 | |
843 843 | def axisType(mmsname): |
844 844 | """Get the axisType information from a Multi-MS. The AxisType information |
845 845 | is usually added for Multi-MS with the axis which data is parallelized across. |
846 846 | |
847 847 | Keyword arguments: |
848 848 | mmsname -- name of the Multi-MS |
849 849 | |
850 850 | It returns the value of AxisType or an empty string if it doesn't exist. |
851 851 | """ |
852 - | tblocal = tbtool() |
853 852 | |
854 853 | axis = '' |
855 854 | |
856 855 | try: |
857 - | tblocal.open(mmsname, nomodify=True) |
858 - | except: |
859 - | raise ValueError('Unable to open table {0}'.format(mmsname)) |
860 - | |
861 - | tbinfo = tblocal.info() |
862 - | tblocal.close() |
856 + | tbt_local.open(mmsname, nomodify=True) |
857 + | tbinfo = tbt_local.info() |
858 + | except Exception as exc: |
859 + | raise ValueError('Unable to open table {0}. Exception: {1}'.format(mmsname, exc)) |
860 + | finally: |
861 + | tbt_local.close() |
863 862 | |
864 863 | if 'readme' in tbinfo: |
865 864 | readme = tbinfo['readme'] |
866 865 | readlist = readme.splitlines() |
867 866 | for val in readlist: |
868 867 | if val.__contains__('AxisType'): |
869 868 | a,b,axis = val.partition('=') |
870 869 | |
871 - | |
872 870 | return axis.strip() |
873 871 | |
874 872 | def setAxisType(mmsname, axis=''): |
875 873 | """Set the AxisType keyword in a Multi-MS info. If AxisType already |
876 874 | exists, it will be overwritten. |
877 875 | |
878 876 | Keyword arguments: |
879 877 | mmsname -- name of the Multi-MS |
880 878 | axis -- parallel axis of the Multi-MS. Options: scan; spw or scan,spw |
881 879 | |
882 880 | Return True on success, False otherwise. |
883 881 | """ |
884 882 | |
883 + | import copy |
884 + | |
885 885 | if axis == '': |
886 886 | raise ValueError('Axis value cannot be empty') |
887 887 | |
888 - | tblocal = tbtool() |
889 888 | try: |
890 - | tblocal.open(mmsname, nomodify=False) |
891 - | except: |
892 - | raise ValueError('Unable to open table {0}'.format(mmsname)) |
889 + | tbt_local.open(mmsname) |
890 + | tbinfo = tbt_local.info() |
891 + | except Exception as exc: |
892 + | raise ValueError('Unable to open table {0}. Exception: {1}'.format(mmsname, exc)) |
893 + | finally: |
894 + | tbt_local.close() |
893 895 | |
894 - | import copy |
895 - | |
896 - | tbinfo = tblocal.info() |
897 896 | readme = '' |
898 897 | # Save original readme |
899 898 | if 'readme' in tbinfo: |
900 899 | readme = tbinfo['readme'] |
901 900 | |
902 901 | # Check if AxisType already exist and remove it |
903 902 | if axisType(mmsname) != '': |
904 903 | print('WARN: Will overwrite the existing AxisType value') |
905 904 | readlist = readme.splitlines() |
906 905 | newlist = copy.deepcopy(readlist) |
907 906 | for val in newlist: |
908 907 | if val.__contains__('AxisType'): |
909 908 | readlist.remove(val) |
910 909 | |
911 910 | # Recreate the string |
912 911 | nr = '' |
913 912 | for val in readlist: |
914 913 | nr = nr + val + '\n' |
915 914 | |
916 - | readme=nr.rstrip() |
915 + | readme = nr.rstrip() |
917 916 | |
918 917 | |
919 918 | # Preset for axis info |
920 919 | axisInfo = "AxisType = " |
921 920 | axis.rstrip() |
922 921 | axisInfo = axisInfo + axis + '\n' |
923 922 | |
924 923 | # New readme |
925 924 | newReadme = axisInfo + readme |
926 925 | |
927 926 | # Create readme record |
928 927 | readmerec = {'readme':newReadme} |
929 - | |
930 - | tblocal.putinfo(readmerec) |
931 - | tblocal.close() |
928 + | |
929 + | try: |
930 + | tbt_local.open(mmsname, nomodify=False) |
931 + | tbt_local.putinfo(readmerec) |
932 + | except Exception as exc: |
933 + | raise ValueError('Unable to put readme info into table {0}. Exception: {1}'. |
934 + | format(mmsname, exc)) |
935 + | finally: |
936 + | tbt_local.close() |
932 937 | |
933 938 | # Check if the axis was correctly added |
934 939 | check_axis = axisType(mmsname) |
935 940 | |
936 941 | if check_axis != axis: |
937 942 | return False |
938 943 | |
939 944 | return True |
940 945 | |
941 946 | def buildScanDDIMap(scanSummary, ddIspectralWindowInfo): |
993 998 | # Update field nvis |
994 999 | if fieldId not in nVisPerField: |
995 1000 | nVisPerField[fieldId] = nvis |
996 1001 | else: |
997 1002 | nVisPerField[fieldId] = nVisPerField[fieldId] + nvis |
998 1003 | |
999 1004 | return scanDdiMap, nVisPerDDI, nVisPerScan, nVisPerField |
1000 1005 | |
1001 1006 | def getPartitionMap(msfilename, nsubms, selection={}, axis=['field','spw','scan'],plotMode=0): |
1002 1007 | """Generates a partition scan/spw map to obtain optimal load balancing with the following criteria: |
1003 - | |
1008 + | |
1004 1009 | 1st - Maximize the scan/spw/field distribution across sub-MSs |
1005 1010 | 2nd - Generate sub-MSs with similar size |
1006 - | |
1011 + | |
1007 1012 | In order to balance better the size of the subMSs the allocation process |
1008 1013 | iterates over the scan,spw pairs in descending number of visibilities. |
1009 - | |
1014 + | |
1010 1015 | That is larger chunks are allocated first, and smaller chunks at the final |
1011 1016 | stages so that they can be used to balance the load in a stable way |
1012 - | |
1017 + | |
1013 1018 | Keyword arguments: |
1014 1019 | msname -- Input MS filename |
1015 - | nsubms -- Number of subMSs |
1020 + | nsubms -- Number of subMSs |
1016 1021 | selection -- Data selection dictionary |
1017 1022 | axis -- Vector of strings containing the axis for load distribution (scan,spw,field) |
1018 1023 | plotMode -- Integer in the range 0-3 to determine the plot generation mode |
1019 1024 | 0 - Don't generate any plots |
1020 1025 | 1 - Show plots but don't save them |
1021 1026 | 2 - Save plots but don't show them |
1022 1027 | 3 - Show and save plots |
1023 - | |
1028 + | |
1024 1029 | Returns a map of the sub-MSs with the corresponding scan/spw selections and the number of visibilities |
1025 1030 | """ |
1026 - | |
1031 + | |
1027 1032 | # Open ms tool |
1028 - | myMsTool=mstool() |
1029 - | myMsTool.open(msfilename) |
1030 - | |
1031 - | |
1033 + | mst_local.open(msfilename) |
1034 + | |
1032 1035 | # Apply data selection |
1033 1036 | if isinstance(selection, dict) and selection != {}: |
1034 - | myMsTool.msselect(items=selection) |
1035 - | |
1036 - | |
1037 + | mst_local.msselect(items=selection) |
1038 + | |
1037 1039 | # Get list of DDIs and timestamps per scan |
1038 - | scanSummary = myMsTool.getscansummary() |
1039 - | ddIspectralWindowInfo = myMsTool.getspectralwindowinfo() |
1040 + | scanSummary = mst_local.getscansummary() |
1041 + | ddIspectralWindowInfo = mst_local.getspectralwindowinfo() |
1040 1042 | |
1041 1043 | # Close ms tool |
1042 - | myMsTool.close() |
1043 - | |
1044 - | |
1045 - | # Get list of WVR SPWs using the ms metadata tool |
1046 - | myMsMetaDataTool = msmdtool() |
1047 - | myMsMetaDataTool.open(msfilename) |
1048 - | wvrspws = myMsMetaDataTool.wvrspws() |
1049 - | myMsMetaDataTool.close() |
1044 + | mst_local.close() |
1045 + | |
1046 + | # Get list of WVR SPWs using the ms metadata tool |
1047 + | msmdt_local.open(msfilename) |
1048 + | wvrspws = msmdt_local.wvrspws() |
1049 + | msmdt_local.close() |
1050 1050 | |
1051 1051 | # Mark WVR DDIs as identified by the ms metadata tool |
1052 1052 | for ddi in ddIspectralWindowInfo: |
1053 1053 | if ddIspectralWindowInfo[ddi] in wvrspws: |
1054 1054 | ddIspectralWindowInfo[ddi]['isWVR'] = True |
1055 1055 | else: |
1056 1056 | ddIspectralWindowInfo[ddi]['isWVR'] = False |
1057 - | |
1057 + | |
1058 1058 | scanDdiMap, nVisPerDDI, nVisPerScan, nVisPerField = buildScanDDIMap(scanSummary, |
1059 1059 | ddIspectralWindowInfo) |
1060 1060 | |
1061 1061 | # Sort the scan/ddi pairs depending on the number of visibilities |
1062 1062 | ddiList = list() |
1063 1063 | scanList = list() |
1064 1064 | fieldList = list() |
1065 1065 | nVisList = list() |
1066 1066 | nScanDDIPairs = 0 |
1067 1067 | for scan in scanDdiMap: |