Source
# result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self.mypath + "../work")
import subprocess
import os, sys
import asyncio
# Streaming logic from here:
# https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/
async def _read_stream(stream, cb):
while True:
line = await stream.readline()
if line:
cb(line)
else:
break
async def _stream_subprocess(cmd, stdout_cb, stderr_cb, cwd):
process = await asyncio.create_subprocess_exec(*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd)
if sys.version_info <= (3,11):
await asyncio.wait([
_read_stream(process.stdout, stdout_cb),
_read_stream(process.stderr, stderr_cb)
])
else:
await asyncio.gather(
_read_stream(process.stdout, stdout_cb),
_read_stream(process.stderr, stderr_cb)
)
return await process.wait()
def execute(cmd, to,stdout_cb, stderr_cb, cwd):
loop = asyncio.get_event_loop()
rc = loop.run_until_complete(asyncio.wait_for(
_stream_subprocess(
cmd,
stdout_cb,
stderr_cb,
cwd
), timeout=to))
#loop.close()
return rc
class ShellRunner:
mypath = ""
def __init__(self):
self.mypath = os.path.dirname(os.path.realpath(__file__))
self.failed_tests = []
self.test_suite_passed = False
def process_stdout(self, x):
message = x.decode('utf-8').rstrip("\n\r")
print("%s" % message)
def process_stderr(self, x):
message = x.decode('utf-8').rstrip("\n\r")