# # This file implements a plugin for nose, which monitors the net memory leaked # and opened file descriptors from a test run. The result are written to # an XML file. This plugin is just an adaptation of nose's built-in # XUnit plugin. # import nose.plugins.xunit import traceback import inspect import time import os import subprocess import sys import codecs def nice_classname(obj): """Returns a nice name for class object or class instance. >>> nice_classname(Exception()) # doctest: +ELLIPSIS '...Exception' >>> nice_classname(Exception) 'exceptions.Exception' """ if inspect.isclass(obj): cls_name = obj.__name__ else: cls_name = obj.__class__.__name__ mod = inspect.getmodule(obj) if mod: name = mod.__name__ # jython if name.startswith('org.python.core.'): name = name[len('org.python.core.'):] return "%s.%s" % (name, cls_name) else: return cls_name def write_message(fileleak, memoryleak): if fileleak != 0: print "Net file descriptors opened:", fileleak if memoryleak != 0: print "Net memory allocated:", memoryleak, "kB" return "\ <measurement><name>Files leaked</name><value>" + str(fileleak) + "</value></measurement>\ <measurement><name>Memory leaked (kB)</name><value>" + str(memoryleak) + "</value></measurement>\ " class MemTest(nose.plugins.xunit.Xunit): name = "memtest" def options(self, parser, env): # Identical to the base class method, except that the command line # option needs to be called something different """Sets additional command line options.""" nose.plugins.base.Plugin.options(self, parser, env) parser.add_option( '--memtest-file', action='store', dest='xunit_file', metavar="FILE", default=env.get('NOSE_XUNIT_FILE', 'nosetests.xml'), help=("Path to xml file to store the xunit report in. " "Default is nosetests.xml in the working directory " "[NOSE_XUNIT_FILE]")) # Find the lsof executable self.lsof = "/usr/sbin/lsof" if not os.path.exists(self.lsof): self.lsof = "/usr/bin/lsof" if not os.path.exists(self.lsof): print "Warning: Could not find lsof at /usr/sbin/lsof or /usr/bin/lsof" def report(self, stream): """Writes an Xunit-formatted XML file The file includes a report of test errors and failures. """ if os.environ.get('NOSE_XUNIT_FILE'): self.error_report_file = codecs.open(os.environ['NOSE_XUNIT_FILE'], 'w',self.encoding, 'replace') else: self.error_report_file = codecs.open("nosetests.xml", 'w',self.encoding, 'replace') self.stats['encoding'] = self.encoding self.stats['total'] = (self.stats['errors'] + self.stats['failures'] + self.stats['passes'] + self.stats['skipped']) self.error_report_file.write( '' '' % self.encoding) self.error_report_file.write(''.join(self.errorlist)) self.error_report_file.write('') self.error_report_file.close() if self.config.verbosity > 1: stream.writeln("-" * 70) stream.writeln("XML: %s" % self.error_report_file.name) @staticmethod def _getstatusoutput(cmd): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) stdoutdata, stderrdata = p.communicate(None) if stderrdata is not None and len(stderrdata) > 0: print >> sys.stderr, stderrdata return (p.returncode, stdoutdata) def startTest(self, test): """Initializes a timer before starting a test.""" self._timer = time.time() self._pid = os.getpid() msg = '***** List of open files after running %s\n'%test infile = open('ListOpenFiles','a') infile.write(msg) infile.close() (errorcode, n) = self._getstatusoutput(self.lsof + ' -p ' + str(self._pid) + ' | wc -l') if errorcode == 0: self._openfiles = n else: self._openfiles = 0 (errorcode, n) = self._getstatusoutput('env -i ps -p ' + str(self._pid) + ' -o rss | tail -1') if errorcode == 0: self._resident_memory = n else: self._resident_memory = 0 def stopContext(self, context): try: out = subprocess.check_output("du -h", shell=True) except subprocess.CalledProcessError, e: out = e.output print "Directory contents after", context print out def _update_after_test(self): # The predefined hooks stopTest() and afterTest() cannot be used # because they get called after addError/addFailure/addSuccess subprocess.call( self.lsof + ' -p ' + str(self._pid) + ' | grep -i nosedir >> ListOpenFiles', shell=True) (errorcode, n) = self._getstatusoutput(self.lsof + ' -p ' + str(self._pid) + ' | wc -l') if errorcode == 0: self._fileleak = int(n) - int(self._openfiles) else: self._fileleak = -1 (errorcode, n) = self._getstatusoutput('env -i ps -p ' + str(self._pid) + ' -o rss | tail -1') if errorcode == 0: self._memoryleak = int(n) - int(self._resident_memory) else: self._memoryleak = 0 def addError(self, test, err, capt=None): """Add error output to Xunit report. """ self._update_after_test() taken = time.time() - self._timer if issubclass(err[0], nose.exc.SkipTest): self.stats['skipped'] +=1 return tb = ''.join(traceback.format_exception(*err)) self.stats['errors'] += 1 id = test.id() name = id[-id[::-1].find("."):] self.errorlist.append( '' '' '%(tb)s' % {'cls': '.'.join(id.split('.')[:-1]), 'name': name, 'errtype': nice_classname(err[0]), 'tb': tb, 'taken': taken, }) self.errorlist.append(write_message(self._fileleak, self._memoryleak)) self.errorlist.append('') def addFailure(self, test, err, capt=None, tb_info=None): """Add failure output to Xunit report. """ self._update_after_test() taken = time.time() - self._timer tb = ''.join(traceback.format_exception(*err)) self.stats['failures'] += 1 id = test.id() name = id[-id[::-1].find("."):] self.errorlist.append( '' '' '%(tb)s' % {'cls': '.'.join(id.split('.')[:-1]), 'name': name, 'errtype': nice_classname(err[0]), 'tb': tb, 'taken': taken, }) self.errorlist.append(write_message(self._fileleak, self._memoryleak)) self.errorlist.append('') def addSuccess(self, test, capt=None): """Add success output to Xunit report. """ self._update_after_test() taken = time.time() - self._timer self.stats['passes'] += 1 id = test.id() name = id[-id[::-1].find("."):] self.errorlist.append( '' '' % {'cls': '.'.join(id.split('.')[:-1]), 'name': name, 'taken': taken, }) self.errorlist.append(write_message(self._fileleak, self._memoryleak)) self.errorlist.append('')