#
# This file implements a plugin for nose, which monitors the net memory leaked
# and opened file descriptors from a test run. The result are written to
# an XML file. This plugin is just an adaptation of nose's built-in
# XUnit plugin.
#
import nose.plugins.xunit
import traceback
import inspect
import time
import os
import subprocess
import sys
import codecs
def nice_classname(obj):
"""Returns a nice name for class object or class instance.
>>> nice_classname(Exception()) # doctest: +ELLIPSIS
'...Exception'
>>> nice_classname(Exception)
'exceptions.Exception'
"""
if inspect.isclass(obj):
cls_name = obj.__name__
else:
cls_name = obj.__class__.__name__
mod = inspect.getmodule(obj)
if mod:
name = mod.__name__
# jython
if name.startswith('org.python.core.'):
name = name[len('org.python.core.'):]
return "%s.%s" % (name, cls_name)
else:
return cls_name
def write_message(fileleak, memoryleak):
if fileleak != 0:
print "Net file descriptors opened:", fileleak
if memoryleak != 0:
print "Net memory allocated:", memoryleak, "kB"
return "\
<measurement><name>Files leaked</name><value>" + str(fileleak) + "</value></measurement>\
<measurement><name>Memory leaked (kB)</name><value>" + str(memoryleak) + "</value></measurement>\
"
class MemTest(nose.plugins.xunit.Xunit):
name = "memtest"
def options(self, parser, env):
# Identical to the base class method, except that the command line
# option needs to be called something different
"""Sets additional command line options."""
nose.plugins.base.Plugin.options(self, parser, env)
parser.add_option(
'--memtest-file', action='store',
dest='xunit_file', metavar="FILE",
default=env.get('NOSE_XUNIT_FILE', 'nosetests.xml'),
help=("Path to xml file to store the xunit report in. "
"Default is nosetests.xml in the working directory "
"[NOSE_XUNIT_FILE]"))
# Find the lsof executable
self.lsof = "/usr/sbin/lsof"
if not os.path.exists(self.lsof):
self.lsof = "/usr/bin/lsof"
if not os.path.exists(self.lsof):
print "Warning: Could not find lsof at /usr/sbin/lsof or /usr/bin/lsof"
def report(self, stream):
"""Writes an Xunit-formatted XML file
The file includes a report of test errors and failures.
"""
if os.environ.get('NOSE_XUNIT_FILE'):
self.error_report_file = codecs.open(os.environ['NOSE_XUNIT_FILE'], 'w',self.encoding, 'replace')
else:
self.error_report_file = codecs.open("nosetests.xml", 'w',self.encoding, 'replace')
self.stats['encoding'] = self.encoding
self.stats['total'] = (self.stats['errors'] + self.stats['failures']
+ self.stats['passes'] + self.stats['skipped'])
self.error_report_file.write(
''
'' % self.encoding)
self.error_report_file.write(''.join(self.errorlist))
self.error_report_file.write('')
self.error_report_file.close()
if self.config.verbosity > 1:
stream.writeln("-" * 70)
stream.writeln("XML: %s" % self.error_report_file.name)
@staticmethod
def _getstatusoutput(cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdoutdata, stderrdata = p.communicate(None)
if stderrdata is not None and len(stderrdata) > 0:
print >> sys.stderr, stderrdata
return (p.returncode, stdoutdata)
def startTest(self, test):
"""Initializes a timer before starting a test."""
self._timer = time.time()
self._pid = os.getpid()
msg = '***** List of open files after running %s\n'%test
infile = open('ListOpenFiles','a')
infile.write(msg)
infile.close()
(errorcode, n) = self._getstatusoutput(self.lsof + ' -p ' + str(self._pid) + ' | wc -l')
if errorcode == 0:
self._openfiles = n
else:
self._openfiles = 0
(errorcode, n) = self._getstatusoutput('env -i ps -p ' + str(self._pid) + ' -o rss | tail -1')
if errorcode == 0:
self._resident_memory = n
else:
self._resident_memory = 0
def stopContext(self, context):
try:
out = subprocess.check_output("du -h", shell=True)
except subprocess.CalledProcessError, e:
out = e.output
print "Directory contents after", context
print out
def _update_after_test(self):
# The predefined hooks stopTest() and afterTest() cannot be used
# because they get called after addError/addFailure/addSuccess
subprocess.call(
self.lsof + ' -p ' + str(self._pid) + ' | grep -i nosedir >> ListOpenFiles',
shell=True)
(errorcode, n) = self._getstatusoutput(self.lsof + ' -p ' + str(self._pid) + ' | wc -l')
if errorcode == 0:
self._fileleak = int(n) - int(self._openfiles)
else:
self._fileleak = -1
(errorcode, n) = self._getstatusoutput('env -i ps -p ' + str(self._pid) + ' -o rss | tail -1')
if errorcode == 0:
self._memoryleak = int(n) - int(self._resident_memory)
else:
self._memoryleak = 0
def addError(self, test, err, capt=None):
"""Add error output to Xunit report.
"""
self._update_after_test()
taken = time.time() - self._timer
if issubclass(err[0], nose.exc.SkipTest):
self.stats['skipped'] +=1
return
tb = ''.join(traceback.format_exception(*err))
self.stats['errors'] += 1
id = test.id()
name = id[-id[::-1].find("."):]
self.errorlist.append(
''
''
'%(tb)s' %
{'cls': '.'.join(id.split('.')[:-1]),
'name': name,
'errtype': nice_classname(err[0]),
'tb': tb,
'taken': taken,
})
self.errorlist.append(write_message(self._fileleak, self._memoryleak))
self.errorlist.append('')
def addFailure(self, test, err, capt=None, tb_info=None):
"""Add failure output to Xunit report.
"""
self._update_after_test()
taken = time.time() - self._timer
tb = ''.join(traceback.format_exception(*err))
self.stats['failures'] += 1
id = test.id()
name = id[-id[::-1].find("."):]
self.errorlist.append(
''
''
'%(tb)s' %
{'cls': '.'.join(id.split('.')[:-1]),
'name': name,
'errtype': nice_classname(err[0]),
'tb': tb,
'taken': taken,
})
self.errorlist.append(write_message(self._fileleak, self._memoryleak))
self.errorlist.append('')
def addSuccess(self, test, capt=None):
"""Add success output to Xunit report.
"""
self._update_after_test()
taken = time.time() - self._timer
self.stats['passes'] += 1
id = test.id()
name = id[-id[::-1].find("."):]
self.errorlist.append(
''
'' %
{'cls': '.'.join(id.split('.')[:-1]),
'name': name,
'taken': taken,
})
self.errorlist.append(write_message(self._fileleak, self._memoryleak))
self.errorlist.append('')