python如何调用shell(Python调用Shell的常见操作和实时输出)

时间:2023-06-21 15:02:32

作者:admin

来源:系统助手

# 第一种,使用os.system方法,这种方法无法获取输出结果

import osos.system('ls')

# 第二种,使用os.popen方法,这种方法可以获取输出

import osstream = os.popen('echo 12345') #popen也有不同版本output = stream.read()print(output) #输出12345

# 第三种,使用subprocess模块

class subprocess.Popen(args,bufsize=-1,shell=False,stdin=None,stdout=None,stderr=None)'''args:对应的shell命令bufsize:缓冲区大小,0表示不使用缓冲区shell:若该参数为True,将使用操作系统的shell执行命令stdin,stdout,stderr分别表示程序的标准输入,输出,错误句柄-poll():检查进程是否终止,终止则返回returncode,否则None-wait(timeout):等待子进程终止-communicate(timeout):和子进程交互-terminate():停止子进程==send_signal(SIGTERM)-kill():杀死子进程'''# 下面是范例# 获取实时输出import subprocessimport shlexdef real_run_command(command):process =subprocess.Popen(shlex.split(command),stdout = subprocess.PIPE) #同时使用了subprocess.PIPE 作为参数,这个是一个特殊值,用于表明这些通道要开放while True:output = process.stdout.readline()if output == b"" and process.poll() is not None: # 因为输出的字节流,所以要用b''或者bytes.decode(str,errors='ignore')breakif output:print(output.strip())rc = process.poll()return rc#调用rc = real_run_command(sh test.sh)print("return code=%s"%rc)

第四种:ssh实时输出

#SSH

Python 执行远程主机可以使用 paramiko 框架,但 paramiko 框架的 exec_command 方法, 默认是没有开启 bufsize 的, 也就是说必须等到一个命令执行完, 我们才可以打印到命令的输出信息, 但为了体验更接近在终端执行的感觉, 实时输出就很有必要了。我这里的需求是 websockets 实时输出远程命令的日志信息,所以我只需要定义 command 和下面的 callback 函数就可以了。

Paramiko 的 exec_command 方法提供了 bufsize 参数, 我们可以调小缓冲区, 然后使程序更快的打满缓冲区生成缓冲块的方式, 来实现实时输出。我们对SSHClient 简单封装一下, 增加一个 run 的方法。

Python

from itertools import izip_longestfrom paramiko import SSHClientfrom paramiko import AutoAddPolicyclass MySSHClient(SSHClient): def run(self, command, callback): stdin, stdout, stderr = self.exec_command( command, bufsize=1 ) stdout_iter = iter(stdout.readline, '') stderr_iter = iter(stderr.readline, '') for out, err in izip_longest(stdout_iter, stderr_iter): if out: callback(out.strip()) if err: callback(err.strip()) return stdin, stdout, stderr

使用方式和原生的 SSHClient 一样, 只不过不去调用 exec_command 方法了, 改为调用 run 方法.

Python

def console(text): print(text)ssh = MySSHClient()ssh.set_missing_host_key_policy(AutoAddPolicy())ssh.connect("IPADDRESS", 22, "USER", "PASSWORD")stdin, stdout, stderr = ssh.run("python -u test.py", console)print stderr.channel.recv_exit_status()

系统下载排行榜71011xp

提取码
XGZS
关闭 前往下载