在好例子网,分享、交流、成长!
您当前所在位置:首页Python 开发实例Python网络编程 → python 微信机器人代码

python 微信机器人代码

Python网络编程

下载此实例
  • 开发语言:Python
  • 实例大小:0.02M
  • 下载次数:95
  • 浏览次数:2403
  • 发布时间:2016-03-17
  • 实例类别:Python网络编程
  • 发 布 人:hareqiqi
  • 文件格式:.py
  • 所需积分:5
 相关标签: 微信 机器人

同类人气实例

实例介绍

【实例简介】

Python实现的微信机器人

【实例截图】

【核心代码】

# coding=utf-8

import os
import requests
import simplejson as json
import time
import re
import xml.dom.minidom
# from collections import deque
import queue
import html

import threading
# lock = threading.Lock()

DEBUG = False
LOG = True

deviceId = 'e000000000000000' 
g_info = {}
g_info['tip'] = 0
g_queue = queue.Queue()# []

import config
apikey = config.apikey

def getUUID():
    global g_info

    url = 'https://login.weixin.qq.com/jslogin'
    params = {
        'appid': 'wx782c26e4c19acffb',
        'fun': 'new',
        'lang': 'zh_CN',
        '_': int(time.time()),   
    }

    r = requests.post(url, params, verify=False)
    if DEBUG:
        print (r.text)
    text = r.text
    regx = r'window.QRLogin.code = (\d ); window.QRLogin.uuid = "(\S ?)"'
    pm = re.search(regx, text)

    code = pm.group(1)
    uuid = pm.group(2)

    g_info['uuid'] = uuid
    if code == '200':
        return True 

    return False

def showQRImage():
    global g_info

    uuid = g_info['uuid']
    url = 'https://login.weixin.qq.com/qrcode/'   uuid
    params = {
        't': 'webwx',
        '_': int(time.time()),
    }

    r = requests.post(url, params, verify=False)
    g_info['tip'] = 1

    with open('qrcode.jpg', 'wb') as fd:
        for chunk in r.iter_content(512):
            fd.write(chunk)

    print ('请扫描二维码。。。')

def waitForLogin():
    global g_info

    tip = g_info['tip']
    uuid = g_info['uuid']
    url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s' % (
        tip, uuid, int(time.time()))  

    r = requests.get(url, verify=False)
    text = r.text
    regx = r'window.code=(\d );'
    pm = re.search(regx, text)
    
    code = pm.group(1)
    if code == '201':
        print ('成功扫描,请在手机上点击确认以登录')
        g_info['tip'] = 0
    elif code == '200':
        print ('正在登录。。。')
        regx = r'window.redirect_uri="(\S ?)";'
        pm = re.search(regx, text)
        redirect_uri = pm.group(1)   '&fun=new'
        g_info['redirect_uri']  = redirect_uri
        base_uri = redirect_uri[:redirect_uri.rfind('/')]
        g_info['base_uri'] = base_uri

        services = [
            ('wx2.qq.com', 'webpush2.weixin.qq.com'),
            ('qq.com', 'webpush.weixin.qq.com'),
            ('web1.wechat.com', 'webpush1.wechat.com'),
            ('web2.wechat.com', 'webpush2.wechat.com'),
            ('wechat.com', 'webpush.wechat.com'),
            ('web1.wechatapp.com', 'webpush1.wechatapp.com'),
        ]

        push_uri = base_uri
        for (searchUrl, pushUrl) in services:
            if base_uri.find(searchUrl) >= 0:
                push_uri = 'https://%s/cgi-bin/mmwebwx-bin' % pushUrl
                break  
        g_info['push_uri'] = push_uri

    elif code == '408':
        pass

    return code

def login():
    global g_info

    redirect_uri = g_info['redirect_uri']
    r = requests.get(redirect_uri, verify=False)


    g_info['cookies'] = r.cookies

    doc = xml.dom.minidom.parseString(r.text)
    root = doc.documentElement

    for node in root.childNodes:
        if node.nodeName == 'skey':
            skey = node.childNodes[0].data
        elif node.nodeName == 'wxsid':
            wxsid = node.childNodes[0].data
        elif node.nodeName == 'wxuin':
            wxuin = node.childNodes[0].data
        elif node.nodeName == 'pass_ticket':
            pass_ticket = node.childNodes[0].data   

    if not all((skey, wxsid, wxuin, pass_ticket)):
        return False

    BaseRequest = {
        'Uin': int(wxuin),
        'Sid': wxsid,
        'Skey': skey,
        'DeviceID': deviceId,
    }

    g_info['skey'] = skey
    g_info['wxsid'] = wxsid
    g_info['wxuin'] = wxuin
    g_info['pass_ticket'] = pass_ticket
    g_info['BaseRequest'] = BaseRequest

    if DEBUG:
        print (skey, wxsid, wxuin)

    return True



def responseState(func, BaseResponse):
    ErrMsg = BaseResponse['ErrMsg']
    Ret = BaseResponse['Ret']
    if DEBUG:
        print('func: %s, Ret: %d, ErrMsg: %s' % (func, Ret, ErrMsg))

    if Ret != 0:
        return False

    return True

def webwxinit():
    global g_info

    base_uri = g_info['base_uri']
    pass_ticket = g_info['pass_ticket']
    skey = g_info['skey']
    BaseRequest = g_info['BaseRequest']

    url = base_uri   \
        '/webwxinit?pass_ticket=%s&skey=%s&r=%s' % (
            pass_ticket, skey, int(time.time()))
    params = {
        'BaseRequest': BaseRequest
    }

    r = requests.post(url, json.dumps(params), verify=False)
    text = r.text

    if DEBUG:
        with open('webwxinit.json', 'wb') as fd:
            for chunk in r.iter_content(512):
                fd.write(chunk)

    dic = json.loads(text)
    ContactList = dic['ContactList']
    My = dic['User']
    SyncKey = dic['SyncKey']

    g_info['ContactList'] = ContactList
    g_info['My'] = My 
    g_info['SyncKey'] = SyncKey

    state = responseState('webwxinit', dic['BaseResponse'])
    return state

def webwxgetcontact():
    global g_info

    base_uri = g_info['base_uri']
    pass_ticket = g_info['pass_ticket']
    skey = g_info['skey']

    url = base_uri   \
        '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % (
            pass_ticket, skey, int(time.time()))
    headers = {'Content-Type':'application/json; charset=UTF-8'}
    r = requests.get(url, verify=False, headers=headers,cookies=g_info['cookies'])
    text = r.text

    if DEBUG:
        with open('webwxgetcontact.json', 'wb') as fd:
            for chunk in r.iter_content(512):
                fd.write(chunk)

    
    dic = json.loads(text, 'utf-8')
    MemberList = dic['MemberList']
    SpecialUsers = ["newsapp", "fmessage", "filehelper", "weibo", "qqmail", "tmessage", "qmessage", "qqsync", "floatbottle", "lbsapp", "shakeapp", "medianote", "qqfriend", "readerapp", "blogapp", "facebookapp", "masssendapp",
                    "meishiapp", "feedsapp", "voip", "blogappweixin", "weixin", "brandsessionholder", "weixinreminder", "wxid_novlwrv3lqwv11", "gh_22b87fa7cb3c", "officialaccounts", "notification_messages", "wxitil", "userexperience_alarm"]
    for i in range(len(MemberList) - 1, -1, -1):
        Member = MemberList[i]
        if Member['VerifyFlag'] & 8 != 0:  # 公众号/服务号
            MemberList.remove(Member)
        elif Member['UserName'] in SpecialUsers:  # 特殊账号
            MemberList.remove(Member)   
    
    _MemberList = {}
    for user in MemberList:
        _MemberList[user['UserName']] = user['NickName']

    g_info['MemberList'] = _MemberList


def syncKey():
    global g_info

    SyncKey = g_info['SyncKey']
    SyncKeyItems = ['%s_%s' % (item['Key'], item['Val'])
                    for item in SyncKey['List']]
    SyncKeyStr = '|'.join(SyncKeyItems)
    return SyncKeyStr

def syncCheck():
    global g_info

    push_uri = g_info['push_uri']
    BaseRequest = g_info['BaseRequest']
    url = push_uri   '/synccheck?'
    params = {
        'skey': BaseRequest['Skey'],
        'sid': BaseRequest['Sid'],
        'uin': BaseRequest['Uin'],
        'deviceId': BaseRequest['DeviceID'],
        'synckey': syncKey(),
        'r': int(time.time()),
    }

    r = requests.get(url, verify=False, params=params, cookies=g_info['cookies'], timeout=300)
    text = r.text
    regx = r'window.synccheck={retcode:"(\d )",selector:"(\d )"}'
    pm = re.search(regx, text)    

    retcode = pm.group(1)
    selector = pm.group(2)

    if DEBUG or LOG:
        print (text)

    return (retcode, selector)


def webwxsync():
    global g_info
    
    base_uri = g_info['base_uri']
    BaseRequest = g_info['BaseRequest']
    SyncKey = g_info['SyncKey']
    pass_ticket = g_info['pass_ticket']
    # url = base_uri   '/webwxsync?lang=zh_CN&skey=%s&sid=%s&pass_ticket=%s' % (
    #     BaseRequest['Skey'], BaseRequest['Sid'], quote_plus(pass_ticket))
    url = base_uri   '/webwxsync?'
    params = {
        'BaseRequest': BaseRequest,
        'SyncKey': SyncKey,
        'rr': ~int(time.time()),
    } 
    url_params = {
        # 'lang' : 'zh_CN' ,
        'skey' : BaseRequest['Skey'] ,
        'pass_ticket' : pass_ticket ,
        'sid' : BaseRequest['Sid']
    }
    headers = {
        'ContentType': 'application/json; charset=UTF-8'
    }
    r = requests.post(url, verify=False, params=url_params, data=json.dumps(params), headers = headers, cookies=g_info['cookies'])

    # if DEBUG:
    #     global i 
    #     i = i   1
    #     with open('sync'    str(i)   '.json', 'wb') as fd:
    #         for chunk in r.iter_content(512):
    #             fd.write(chunk)
    r.encoding = 'utf-8'
    dic = json.loads(r.text)
    # print (dic)
    # 更新 synckey
    g_info['SyncKey'] = dic['SyncKey']
    state = responseState('webwxsync', dic['BaseResponse'])

    msg_list = dic['AddMsgList']
    if DEBUG:
        print (msg_list)

    return (state, msg_list)


def getMsg(msg_list):
    global g_info, g_queue

    My = g_info['My']

    for msg in msg_list:
        if msg['MsgType'] != 1:
            continue
        if msg['FromUserName'] != My['UserName']:
            continue
        if msg['ToUserName'] == My['UserName']:
            response = {}
            content = msg['Content']

            # 群消息中 :<br/> 之前是 UserName
            if content.find(':<br/>') != -1:
                fromsomeone = content[:content.find(':<br/>')]
            else:
                fromsomeone = ''

            fromsomeone_NickName = ''
            if msg['FromUserName'].find('@') == -1:
                continue
            if msg['FromUserName'].find('@@') != -1:
                continue
                # 如果是来自群,那就试着去 g_info[] 对应的群中找群成员列表
                groupName = msg['FromUserName']
                # 群还没有记录
                if groupName not in g_info:
                    g_info['Group_UserName_Req'] = msg['FromUserName']
                # 在群列表中有了,因为可能群成员会变化,所以要再次找一遍
                elif fromsomeone in g_info[groupName]:
                    fromsomeone_NickName = g_info[groupName][fromsomeone]
                    fromsomeone_NickName = '@'   fromsomeone_NickName   ' '
                # 找不到,所以置标志位,会在另一个群中触发寻找行为
                else:
                    g_info['Group_UserName_Req'] = msg['FromUserName']
            else:
                fromsomeone_NickName = ''

            response['fromsomeone'] = fromsomeone_NickName
            response['Content'] = content[content.find('>') 1:]
            response['FromUserName'] = msg['FromUserName']

            print ('机器人收到消息:%s' % response['Content'])

            # g_info['Group_UserName_Req'] = response['FromUserName']

            # 不停地塞新消息
            # lock.acquire()
            # try:
            #     g_queue.append(response)
            # finally:
            #     lock.release()
            # print (response['Content'])

            # test
            if g_queue.qsize() > 5:
                g_queue.get()
                g_queue.get()
            g_queue.put(response)

    if LOG:
        print ('getmsg queue: %s' % g_queue.qsize())


def webwxsendmsg(content, user):
    global g_info

    base_uri = g_info['base_uri']
    pass_ticket = g_info['pass_ticket']
    BaseRequest = g_info['BaseRequest']
    My = g_info['My']

    wxuin = BaseRequest['Uin']
    wxsid = BaseRequest['Sid']
    skey = BaseRequest['Skey']
    deviceId = BaseRequest['DeviceID']

    url = base_uri   \
    '/webwxsendmsg?pass_ticket=%s' % (pass_ticket)

    msgid = int(time.time()*10000000)
    msg = {
        'ClientMsgId' : msgid ,
        'Content' : content,
        'FromUserName' : My['UserName'] ,
        'LocalID' : msgid ,
        'ToUserName' : user,
        'Type' : 1
    }
    params = {
        'BaseRequest' : BaseRequest,
        'Msg' : msg
    }
    data = json.dumps(params, ensure_ascii=False)
    # print (data)
    data = data.encode('utf-8')
    r = requests.post(url, verify=False, data=data, cookies=g_info['cookies'])


def sendMsg():
    global g_info, g_queue

    MemberList = g_info['MemberList']
    tuling_url = 'http://www.tuling123.com/openapi/api?key='   apikey   '&info='

    time.sleep(1)
    if LOG:
        print ('sendmsg queue: %s' % g_queue.qsize())
    while g_queue.empty() is False:
        # lock.acquire()
        # try:
        #     response = g_queue.popleft()
        # finally:
        #     lock.release()
        response = g_queue.get()

        content = response['Content']
        from_user = response['FromUserName']
        AT = response['fromsomeone']   
        print
        print ('机器人处理消息:%s' % content)
        tuling_url = tuling_url   content
        try:
            data = requests.get(tuling_url, verify=False)
            data = json.loads(data.text)
            text = data['text']
        except:
            text = '网络异常。。。。。'          
        webwxsendmsg(AT   text, from_user)
        time.sleep(1)

        if LOG:
            print ('机器人的回复: %s' % text)
            print




def webwxbatchgetcontact(UserName):
    global g_info

    base_uri = g_info['base_uri']
    pass_ticket = g_info['pass_ticket']
    BaseRequest = g_info['BaseRequest']

    url = base_uri   '/webwxbatchgetcontact?'


    List = [{
        'ChatRoomId' : '',
        'UserName' : UserName
    }]
    
    post_params = {
        'BaseRequest': BaseRequest ,
        'Count' : 1 ,
        'List' : List
    } 
    url_params = {
        'lang' : 'zh_CN' ,
        'type' : 'ex' ,
        'pass_ticket' : pass_ticket ,
        'r' : int(time.time())
    }

    headers = {
        'ContentType': 'application/json; charset=UTF-8'
    }
    r = requests.post(url, verify=False, params=url_params, data=json.dumps(post_params), headers = headers, cookies=g_info['cookies'])

    r.encoding = 'utf-8'
    dic = json.loads(r.text)    


    GroupMapUsers = {}
    ContactList = dic['ContactList']
    for contact in ContactList:
        memberlist = contact['MemberList']
        for member in memberlist:
            # 默认 @群名片,没有群名片就 @昵称
            nickname = member['NickName']
            displayname = member['DisplayName']
            AT = ''
            if displayname == '':
                # 有些人的昵称会有表情 <span> 会表示成 &lt;span&gt;
                # 需要 html.unescape() 转义一下
                AT = html.unescape(nickname)
            else:
                AT = html.unescape(displayname)
            GroupMapUsers[member['UserName']] = AT

            if DEBUG:
                print (member['NickName'])

    # 整群的成员列表消息记录
    g_info[UserName] = GroupMapUsers




def getgroupinfo():
    global g_info

    if 'Group_UserName_Req' not in g_info:
        return
    if g_info['Group_UserName_Req'] == '0':
        return

    Group_UserName = g_info['Group_UserName_Req']
    webwxbatchgetcontact(Group_UserName)

    # 这个变量表示一次 获取群成员列表 请求。请求完毕置空
    g_info['Group_UserName_Req'] = '0'

    time.sleep(0.5)


    
def heartBeatLoop():
    while True:
        retcode, selector = syncCheck()
        if retcode != '0':
            print ('sync 失败。。。')
        if selector != '0':
            state, msg_list = webwxsync()
            getMsg(msg_list)
            getgroupinfo()

        time.sleep(1)



def main():
    global g_info

    if not getUUID():
        print ('获取 uuid 失败')
        return
    print ('获取二维码图片中。。。')
    showQRImage()
    time.sleep(1)

    while waitForLogin() != '200':
        pass

    if not login():
        print ('登陆失败')
        return

    if not webwxinit():
        print ('初始化失败')
        return

    print ('登陆')

    print ('获取好友。。。。')
    webwxgetcontact()

    print ('开始心跳 噗咚噗通')
    t1 = threading.Thread(target=heartBeatLoop)
    t1.start()

    MemberCount = len(g_info['MemberList'])
    print ('这位同志啊,你有 %s 个好友' % MemberCount)

    try:
        while True:
            sendMsg()
            time.sleep(1.5)
    except KeyboardInterrupt:
        print ('bye bye ~')

if __name__ == '__main__':
    main()

标签: 微信 机器人

实例下载地址

python 微信机器人代码

不能下载?内容有错? 点击这里报错 + 投诉 + 提问

好例子网口号:伸出你的我的手 — 分享

网友评论

发表评论

(您的评论需要经过审核才能显示)

查看所有0条评论>>

小贴士

感谢您为本站写下的评论,您的评论对其它用户来说具有重要的参考价值,所以请认真填写。

  • 类似“顶”、“沙发”之类没有营养的文字,对勤劳贡献的楼主来说是令人沮丧的反馈信息。
  • 相信您也不想看到一排文字/表情墙,所以请不要反馈意义不大的重复字符,也请尽量不要纯表情的回复。
  • 提问之前请再仔细看一遍楼主的说明,或许是您遗漏了。
  • 请勿到处挖坑绊人、招贴广告。既占空间让人厌烦,又没人会搭理,于人于己都无利。

关于好例子网

本站旨在为广大IT学习爱好者提供一个非营利性互相学习交流分享平台。本站所有资源都可以被免费获取学习研究。本站资源来自网友分享,对搜索内容的合法性不具有预见性、识别性、控制性,仅供学习研究,请务必在下载后24小时内给予删除,不得用于其他任何用途,否则后果自负。基于互联网的特殊性,平台无法对用户传输的作品、信息、内容的权属或合法性、安全性、合规性、真实性、科学性、完整权、有效性等进行实质审查;无论平台是否已进行审查,用户均应自行承担因其传输的作品、信息、内容而可能或已经产生的侵权或权属纠纷等法律责任。本站所有资源不代表本站的观点或立场,基于网友分享,根据中国法律《信息网络传播权保护条例》第二十二与二十三条之规定,若资源存在侵权或相关问题请联系本站客服人员,点此联系我们。关于更多版权及免责申明参见 版权及免责申明

;
报警