当前位置 博文首页 > Django实现聊天机器人

    Django实现聊天机器人

    作者:大江狗 时间:2021-08-12 17:50

    目录
    • 实现原理
    • 第一步 安装环境依赖
    • 第二步 配置Celery
    • 第三步 编写机器人聊天主页面
    • 第四步 编写后台websocket路由及处理方法
    • 第五步 编写Celery异步任务
    • 第六步 运行看效果
    • 小结

    演示效果如下所示:

    实现原理

    用户在聊天界面调用Celery异步任务,Celery异步任务执行完毕后发送结果给channels,然后channels通过websocket将结果实时推送给用户。对于简单的算术运算,Celery一般自行计算就好了。对于网上查找诗人简介这样的任务,Celery会调用Python爬虫(requests+parsel)爬取古诗文网站上的诗人简介,把爬取结果实时返回给用户。

    接下来我们来看下具体的代码实现吧。

    第一步 安装环境依赖

    首先在虚拟环境中安装django和以下主要项目依赖。本项目使用了最新版本,为3.X版本。

     # 主要项目依赖
     pip install django
     pip install channels
     pip install channels_redis
     pip install celery
     pip install redis
     pip install eventlet # windows only
    
     # 爬虫依赖
     pip install requests
     pip install parsel

     新建一个名为myproject的项目,新建一个app名为bots。如果windows下安装报错,如何解决自己网上去找吧,很容易解决。修改settings.py, 将channels和chat加入到INSTALLED_APPS里,并添加相应配置,如下所示:

     INSTALLED_APPS = [
           'django.contrib.admin',
           'django.contrib.auth',
           'django.contrib.contenttypes',
           'django.contrib.sessions',
           'django.contrib.messages',
           'django.contrib.staticfiles',
           'channels', # channels应用     
           'bots', # bots应用
        ]
    
     # 设置ASGI应用
     ASGI_APPLICATION = 'myproject.asgi.application'
    
    # 生产环境中使用redis做后台,安装channels_redis
    import os
    CHANNEL_LAYERS = {
        "default": {
            "BACKEND": "channels_redis.core.RedisChannelLayer",
            "CONFIG": {
                "hosts": [os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/2')],
            },
        },
    }

    最后将bots应用的urls.py加入到项目urls.py中去,这和常规Django项目无异。

     # myproject/urls.py
     from django.conf.urls import include
     from django.urls import path
     from django.contrib import admin
     
     urlpatterns = [
         path('bots/', include('bots.urls')),
         path('admin/', admin.site.urls),
     ]

    第二步 配置Celery

    pip安装好Celery和redis后,我们要对其进行配置。分别修改myproject目录下的__init__.py和celery.py(新建), 添加如下代码:

    # __init__.py
    from .celery import app as celery_app
    __all__ = ('celery_app',)
    
    # celery.py
    import os
    from celery import Celery
    
    # 设置环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
    # 实例化
    app = Celery('myproject')
    
    # namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
    # 但所有Celery配置项必须以CELERY开头,防止冲突
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # 自动从Django的已注册app中发现任务
    app.autodiscover_tasks()
    
    # 一个测试任务
    @app.task(bind=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')

    接着修改settings.py, 增加如下Celery配置:

    # Celery配置
    CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
    CELERY_TIMEZONE = TIME_ZONE
    
    # celery内容等消息的格式设置,默认json
    CELERY_ACCEPT_CONTENT = ['application/json', ]
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'

    完整Celery配置见:Django进阶:万字长文教你使用Celery执行异步和周期性任务(多图)

    第三步 编写机器人聊天主页面

    本例我们只需要利用django普通视图函数编写1个页面,用于展示首页(index)与用户交互的聊天页面。这个页面对应的路由及视图函数如下所示:

     # bots/urls.py
     from django.urls import path
     from . import views
     
     urlpatterns = [
         path('', views.index, name='index'),
     ]
     
     # bots/views.py
     from django.shortcuts import render
     
     def index(request):
         return render(request, 'bots/index.html', {})

    接下来我们编写模板文件index.html,它的路径位置如下所示:

     bots/
         __init__.py
         templates/
             bots/
                 index.html
         urls.py
         views.py

    index.html内容如下所示。

    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8"/>
        <title>Django+Channels+Celery聊天机器人</title>
    </head>
    <body>
    
    <textarea  cols="100" rows="20" readonly></textarea>
    <br/>
    <input  type="text" size="100" 
          placeholder="输入`help`获取帮助信息."/><br/><input  type="button" value="Send"/>
       <script>
        var wss_protocol = (window.location.protocol == 'https:') ? 'wss://': 'ws://';
        var chatSocket = new WebSocket(
            wss_protocol + window.location.host + '/ws/bots/'
            );
    
        chatSocket.onopen = function(e) {
    document.querySelector('#chat-log').value +=
    ('欢迎来到大江狗Django聊天机器人. 请输入`help`获取帮助信息.\n')}
    
        chatSocket.onmessage = function(e) {
            var data = JSON.parse(e.data);
            var message = data['message'];
            document.querySelector('#chat-log').value += (message + '\n');
        };
    
        chatSocket.onclose = function(e) {
    document.querySelector('#chat-log').value +=
    ('Socket closed unexpectedly, please reload the page.\n')};
    
        document.querySelector('#chat-message-input').focus();
        document.querySelector('#chat-message-input').onkeyup = function(e) {
            if (e.keyCode === 13) {  // enter, return
                document.querySelector('#chat-message-submit').click();
            }
        };
    
        document.querySelector('#chat-message-submit').onclick = function(e) {
            var messageInputDom = document.querySelector('#chat-message-input');
            var message = messageInputDom.value;
            chatSocket.send(JSON.stringify({
                'message': message
            }));
         messageInputDom.value = '';
        };
    </script>
    
    </body>
    </html>

    第四步 编写后台websocket路由及处理方法

    当 channels 接受 WebSocket 连接时, 它也会根据根路由配置去查找相应的处理方法。只不过channels的websocket路由不在urls.py中配置,处理函数也不写在views.py。在channels中,这两个文件分别变成了routing.py和consumers.py。

    在bots应用下新建routing.py, 添加如下代码。它的作用是将发送至ws/bots/的websocket请求转由BotConsumer处理。

    from django.urls import re_path
    
    from . import consumers
    
    websocket_urlpatterns = [
        re_path(r'ws/bots/$', consumers.BotConsumer.as_asgi()),
    ]

    注意:定义websocket路由时,推荐使用常见的路径前缀 (如/ws) 来区分 WebSocket 连接与普通 HTTP 连接, 因为它将使生产环境中部署 Channels 更容易,比如nginx把所有/ws的请求转给channels处理。

    与Django类似,我们还需要把这个app的websocket路由加入到项目的根路由中去。编辑myproject/asgi.py, 添加如下代码:

    # myproject/asgi.py
    import os
    
    from channels.auth import AuthMiddlewareStack
    from channels.routing import ProtocolTypeRouter, URLRouter
    from django.core.asgi import get_asgi_application
    import chat.routing
    import bots.routing
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
    
    application = ProtocolTypeRouter({
        "http": get_asgi_application(),
        # websocket请求使用的路由
        "websocket": AuthMiddlewareStack(
            URLRouter(
                bots.routing.websocket_urlpatterns
            )
        )
    })

    接下来在bots应用下新建consumers.py, 添加如下代码:

    import json
    from asgiref.sync import async_to_sync
    from channels.generic.websocket import WebsocketConsumer
    
    from . import tasks
    
    COMMANDS = {
        'help': {
            'help': '命令帮助信息.',
        },
        'add': {
            'args': 2,
            'help': '计算两个数之和, 例子: `add 12 32`.',
            'task': 'add'
        },
        'search': {
            'args': 1,
            'help': '通过名字查找诗人介绍,例子: `search 李白`.',
            'task': 'search'
        },
    }
    
    
    
    class BotConsumer(WebsocketConsumer):
        def receive(self, text_data):
            text_data_json = json.loads(text_data)
            message = text_data_json['message']
    
            response_message = '请输入`help`获取命令帮助信息。'
            message_parts = message.split()
            if message_parts:
                command = message_parts[0].lower()
                if command == 'help':
                    response_message = '支持的命令有:\n' + '\n'.join(
                        [f'{command} - {params["help"]} ' for command, params in COMMANDS.items()])
                elif command in COMMANDS:
                    if len(message_parts[1:]) != COMMANDS[command]['args']:
                        response_message = f'命令`{command}`参数错误,请重新输入.'
                    else:
                        getattr(tasks, COMMANDS[command]['task']).delay(self.channel_name, *message_parts[1:])
                        response_message = f'收到`{message}`任务.'
                        
            async_to_sync(self.channel_layer.send)(
                self.channel_name,
                {
                    'type': 'chat.message',
                    'message': response_message
                }
            )
    
        def chat_message(self, event):
            message = event['message']
    
            # Send message to WebSocket
            self.send(text_data=json.dumps({
                'message': f'[机器人]: {message}'
            }))

    上面代码中最重要的一行如下所示。BotConsumer在接收到路由转发的前端消息后,对其解析,将当前频道名和解析后的参数一起交由Celery异步执行。Celery执行任务完成以后会将结果发到这个频道,这样就实现了channels和Celery的通信。

    getattr(tasks, COMMANDS[command]['task']).delay(self.channel_name, *message_parts[1:])

    第五步 编写Celery异步任务

    在bots目录下新建`tasks.py`,添加如下代码:

    from asgiref.sync import async_to_sync
    from celery import shared_task
    from channels.layers import get_channel_layer
    from parsel import Selector
    import requests
    
    channel_layer = get_channel_layer()
    
    @shared_task
    def add(channel_name, x, y):
        message = '{}+{}={}'.format(x, y, int(x) + int(y))
        async_to_sync(channel_layer.send)(channel_name, {"type": "chat.message", "message": message})
        print(message)
    
    @shared_task
    def search(channel_name, name):
        spider = PoemSpider(name)
        result = spider.parse_page()
        async_to_sync(channel_layer.send)(channel_name, {"type": "chat.message", "message": str(result)})
        print(result)
    
    class PoemSpider(object):
        def __init__(self, keyword):
            self.keyword = keyword
            self.url = "https://so.gushiwen.cn/search.aspx"
            
        def parse_page(self):
            params = {'value': self.keyword}
            response = requests.get(self.url, params=params)
            if response.status_code == 200:
                # 创建Selector类实例
                selector = Selector(response.text)
                # 采用xpath选择器提取诗人介绍
                intro = selector.xpath('//textarea[starts-with(@id,"txtareAuthor")]/text()').get()
                print("{}介绍:{}".format(self.keyword, intro))
                if intro:
                    return intro
    
            print("请求失败 status:{}".format(response.status_code))
            return "未找到诗人介绍。"

    以上两个任务都以channel_name为参数,任务执行完毕后通过channel_layer的send方法将结果发送到指定频道。

    注意:

    - 默认获取channel_layer的方式是调用接口:channels.layers.get_channel_layer()。如果是在consumer中调用接口的话可以直接使用self.channel_layer。

    - 对于channel layer的方法(包括send()、group_send(),group_add()等)都属于异步方法,这意味着在调用的时候都需要使用await,而如果想要在同步代码中使用它们,就需要使用装饰器asgiref.sync.async_to_sync

    第六步 运行看效果

    如果不出意外,你现在的项目布局应该如下所示。说实话,整个项目一共没几个文件,Python的简洁和效率真是出了名的好啊。

    连续运行如下命令,就可以看到我们文初的效果啦。

     # 启动django测试服务器
     python manage.py makemigrations
     python manage.py migrate
     python manage.py runserver
     
     # windows下启动Celery需eventlet
     # 启动Celery前确定redis服务已开启哦
     Celery -A myproject worker -l info -P eventlet

    小结

    本文我们使用Django + Channels + Celery + Redis打造了一个聊天机器人,既会算算术,还会查古诗文。借用这个实现原理,你可以打造非常有趣的实时聊天应用哦,比如在线即时问答,在线客服,实时查询订单,Django版的siri美女等等。

    Django Channels + Websocket + Celery聊天机器人项目源码地址:https://github.com/shiyunbo/django-channels-chatbot

    jsjbwy