当前位置 主页 > 服务器问题 > win服务器问题汇总 >

    Python 利用邮件系统完成远程控制电脑的实现(关机、重启等)

    栏目:win服务器问题汇总 时间:2019-11-19 17:54

    0. 我们如何通过邮件系统完成远程控制电脑(关机、重启等)?

    实现思路:

    需要有两个邮箱:接收指令邮箱(A)发送指令邮箱(B)

    被控制的电脑(查看 A 邮箱):
    1. 每隔指定时间监听 A 邮箱 查看最近的邮件
    2. 打开最近邮件,看是否是 B 邮箱地址发来的执行命令
    3. 向 A 和 B 邮箱都发送 主题为:反馈 内容为:“目标计算机已收到指令!开始执行:xxx 指令!” 邮件
    4. 执行指定邮箱发送的执行命令

    远程人员操作:

    登录 B 邮箱,向 A 邮箱发送 主题为:“目标计算机执行” 内容为:xxx指令(例如:关机)

    1. 代码编写

    import poplib
    import smtplib
    import os
    import time
    from email.parser import Parser
    from email.header import decode_header
    from email.utils import parseaddr
    from email.mime.text import MIMEText
    from email.header import Header
     
    email_params = dict()
    email_params['email_message'] = '反馈'
    email_params['email_subject'] = '目标计算机反馈'
    email_params['email_from'] = 'A邮箱'
    email_params['email_user'] = 'A邮箱'
    email_params['email_exec'] = 'B邮箱'
    email_params['email_to'] = ['A邮箱', 'B邮箱']
    email_params['email_smtp'] = 'smtp.163.com'
    email_params['email_pop'] = 'pop.163.com'
    email_params['email_pass'] = 'A邮箱的密码(smtp、pop3的密码)'
    interval_time = 5
     
     
    def decode_str(s):
      value, charset = decode_header(s)[0]
      if charset:
        value = value.decode(charset)
      return value
     
     
    def get_headers(message):
      headers = dict()
      for header in ['From', 'To', 'Subject']:
        value = message.get(header, '')
        if value:
          if header == 'Subject':
            headers['Subject'] = decode_str(value)
          elif header == 'From':
            hdr, addr = parseaddr(value)
            headers['From'] = decode_str(addr)
          elif header == 'To':
            hdr, addr = parseaddr(value)
            headers['To'] = decode_str(addr)
      return headers
     
     
    def guess_charset(message):
      charset = message.get_charset()
      if charset is None:
        content_type = message.get('Content-Type', '').lower()
        pos = content_type.find('charset=')
        if pos >= 0:
          charset = content_type[pos + 8:].strip()
      return charset
     
     
    def get_content(message):
      for part in message.walk():
        content_type = part.get_content_type()
        charset = guess_charset(part)
        if content_type == 'text/plain' and charset:
          try:
            return part.get_payload(decode=True).decode(charset)
          except AttributeError:
            print('type error')
          except LookupError:
            print("unknown encoding: utf-8")
     
     
    def receive_email():
      email_server = poplib.POP3_SSL(email_params['email_pop'])
      email_server.user(email_params['email_user'])
      email_server.pass_(email_params['email_pass'])
      resp, mails, octets = email_server.list()
      index = len(mails)
      if index == 0:
        return None, None
      resp, lines, octets = email_server.retr(index)
      message = b'\r\n'.join(lines).decode('utf-8', 'ignore')
      message = Parser().parsestr(message)
      email_server.quit()
      return get_headers(message), get_content(message)
     
     
    def send_email(email_message):
      message = MIMEText(email_message, 'plain', 'utf-8')
      message['Subject'] = Header(email_params['email_subject'], 'utf-8')
      message['From'] = Header(email_params['email_from'])
      message['To'] = Header(','.join(email_params['email_to']), 'utf-8')
     
      email_server = smtplib.SMTP_SSL(email_params['email_smtp'])
      email_server.login(email_params['email_from'], email_params['email_pass'])
      email_server.sendmail(email_params['email_from'], email_params['email_to'], message.as_string())
      email_server.close()
     
     
    def exec_operator(exec_content):
      if exec_content == '关机':
        os.system("shutdown -s -t 1")
      elif exec_content == '重启':
        os.system("shutdown -r")
      else:
        os.system(exec_content)
     
     
    if __name__ == '__main__':
      while True:
        headers, content = receive_email()
        if headers and content and headers['From'] == email_params['email_exec'] and headers['Subject'] == '目标计算机执行':
          email_message = "目标计算机已接收到指令!开始执行: " + content + " 指令!"
          send_email(email_message)
          exec_operator(content)
        time.sleep(interval_time)