chatGPT,chatgpt是什么

ai资讯1年前 (2023)发布 wan
169 0

本文介绍一下某米音箱如何接入目前大火的ChatGPT ,本文的主要内容是整理Github上yihong0618作者的代码而来,经过归纳整理,并对代码做了少量精简和修改。这里对yihong0618表示感谢,希望大家都能成功的改造自己的音箱。

某米音箱是一个开发的平台,支持个人和企业开发者定制音频产品和定制化操作,在官网可以查看具体的教程。

下面开始详细介绍如何接入chatgpt

系统架构

chatGPT,chatgpt是什么

你需要一个主机来编写程序,连接米家服务器,主机可以是你的个人电脑,树莓派等Arm电脑,或者云端主机,我的测试中使用的是树莓派4B,编程语言是Python,推荐使用云服务器这样可以让你的程序保持持续在线。

主机负责从米家服务上查询你的问答,将问答传输给ChatGPT后,将GPT的内容通过智能音箱来进行播放。

查看你的账户ID

登陆米家官网,登陆你的账号后在菜单栏会显示你的账号ID,这个ID也可以在APP的个人信息里找到,记录下你的账户名和密码。

chatGPT,chatgpt是什么

获取你的账户ID

获取你的机器型号

在你的智能音箱的底部的标签上找到你的机器型号,我的机器是Pro 2代,在小米的底部标签上可以找到如下内容

产品名称:智能音箱
型号: L15A

记录下你的型号我这里是L15A

安装Python库miservice

建议不要直接使用pip3 install miservice这种形式安装,这种安装方式的版本低一些,少几个查询智能音箱当前状态的函数。

在Github上搜索MiService库,作者是yihong0608,将代码clone下来,使用如下方式安装

git clone https://github.com/yihong0618/MiService.git
cd  MiService
sudo pip3 install .

库中介绍了一些查询你的机器信息和使用智能音箱文字转音频的命令,需要的同学,也可以体验一下,取决你的智能音箱型号,很多命令可能是无效的。

同时重要的是你需要获取你的智能音箱的tts文本转语音命令,这里有一个列表,如果你的型号恰好在里面,那么你直接记录一下命令即可,不需要使用后续的命令获得

   "LX06": "5-1",
    "L05B": "5-3",
    "S12A": "5-1",
    "LX01": "5-1",
    "L06A": "5-1",
    "LX04": "5-1",
    "L05C": "5-3",
    "L17A": "7-3",
    "X08E": "7-3",
    "LX05A": "5-1",  
    "LX5A": "5-1",  
    "L15A": "7-3",  

我的型号是L15A,所以我的tts命令是7-3

如果你的型号不在上面的列表中,使用下面的命令获得你的tts命令,上面介绍过我的型号是L15A,所以我在console中输入如下内容。

cd  MiService
export MI_USER= 你的小米ID
export MI_PASS=你的密码

python3 ./micli.py list
通过这个命令可以显示自己账号下的设备列表,包含名称、类型、DID、Token 等信息,记录下的你的
智能音箱的ID
export MI_DID=你的智能音箱的ID
python3 ./micli.py spec xiaomi.wifispeaker.l15a

注意将xiaomi.wifispeaker.l15a中的l15a替换为你的型号,在输出中获取我的音箱命令,如下

chatGPT,chatgpt是什么

我的音箱命令

从音箱命令中找到Intelligent_Speacker后的数字我的是7,找到_Play_Text后的数字我的是3

这样组成了下面代码config.py中的"L15A": "7-3"。所以注意这里一定要找到你的命令代码。

将你的型号和tts命令添加打config.py中,如我上面列表中那样。

完整代码

#!/usr/bin/env python3
import asyncio
import json
import re
import subprocess
import time
from pathlib import Path
from aiohttp import ClientSession
from miservice import MiAccount, MiNAService
from wxtgptbot import  WXTchatbot 
from config import (
    COOKIE_TEMPLATE,
    HARDWARE_COMMAND_DICT,
    KEY_WORD,
    LATEST_ASK_API,
    MI_ASK_SIMULATE_DATA,
    PROMPT,
)
from utils import calculate_tts_elapse, parse_cookie_string
class Migpt:
    def __init__(
        self,
        hardware,
        mi_user,
        mi_pass,
        openai_key,
        cookie="",
        use_command=False,
        mute_xiaoai=False,
        use_gpt3=False,
        use_chatgpt_api=False,
        api_base=None,
        verbose=False,
    ):
        self.mi_token_home = Path.home() / ".mi.token"
        self.hardware = hardware
        self.mi_user = mi_user
        self.mi_pass = mi_pass
        self.openai_key = openai_key
        self.cookie_string = ""
        self.last_timestamp = int(time.time()*1000)  # timestamp last call mi speaker
        self.session = None
        self.chatbot = None  # a little slow to init we move it after xiaomi init
        self.user_id = ""
        self.device_id = ""
        self.service_token = ""
        self.cookie = cookie
        self.use_command = use_command
        self.tts_command = HARDWARE_COMMAND_DICT.get(hardware, "7-3")
        self.conversation_id = None
        self.parent_id = None
        self.miboy_account = None
        self.mina_service = None
        # try to mute xiaoai config
        self.mute_xiaoai = mute_xiaoai
        # mute xiaomi in runtime
        self.this_mute_xiaoai = mute_xiaoai
        # if use gpt3 api
        self.use_gpt3 = use_gpt3
        self.use_chatgpt_api = use_chatgpt_api
        self.api_base = api_base
        self.verbose = verbose
        # this attr can be re set value in cli
        self.key_word = KEY_WORD
        self.prompt = PROMPT

    async def init_all_data(self, session):
        await self.login_miboy(session)
        await self._init_data_hardware()
        with open(self.mi_token_home) as f:
            user_data = json.loads(f.read())
        self.user_id = user_data.get("userId")
        self.service_token = user_data.get("micoapi")[1]
        self._init_cookie()
  
    async def login_miboy(self, session):
        self.session = session
        self.account = MiAccount(
            session,
            self.mi_user,
            self.mi_pass,
            str(self.mi_token_home),
        )
        # Forced login to refresh to refresh token
        await self.account.login("micoapi")
        self.mina_service = MiNAService(self.account)
    async def _init_data_hardware(self):
        if self.cookie:
            # if use cookie do not need init
            return
        hardware_data = await self.mina_service.device_list()
        for h in hardware_data:
            if h.get("hardware", "") == self.hardware:
                self.device_id = h.get("deviceID")
                print("设备id:",self.device_id)
                break
        else:
            raise Exception(f"we have no hardware: {self.hardware} please check")
    '''初始化cookie,调用小米api时需要'''
    def _init_cookie(self):
        if self.cookie:
            self.cookie = parse_cookie_string(self.cookie)
        else:
            self.cookie_string = COOKIE_TEMPLATE.format(
                device_id=self.device_id,
                service_token=self.service_token,
                user_id=self.user_id,
            )
            self.cookie = parse_cookie_string(self.cookie_string)

    #获取小米音箱的最后一次的回答
    async def get_latest_ask_from_xiaoai(self):
        r = await self.session.get(
            LATEST_ASK_API.format(
                hardware=self.hardware, timestamp=str(int(time.time() * 1000))
            ),
            cookies=parse_cookie_string(self.cookie),
        )
        return await r.json()

    def get_last_timestamp_and_record(self, data):
        if "data" in data:
            d= data.get("data")
            records = json.loads(d).get("records")
            if not records:
                return 0, None
            last_record = records[0]
            timestamp = last_record.get("time")
            return timestamp, last_record
        else:
             return 0, None
     
    async def do_tts(self, value, wait_for_finish=False):
        if not self.use_command:
            try:
                await self.mina_service.text_to_speech(self.device_id, value)
            except:
                # do nothing is ok
                pass
        else:
            #使用micli工具
            subprocess.check_output(["micli", self.tts_command, value])
        if wait_for_finish:
            elapse = calculate_tts_elapse(value)
            await asyncio.sleep(elapse)
            print("elapse:",elapse)
            while True:
                if not await self.get_if_xiaoai_is_playing():
                    break
                await asyncio.sleep(2)
            await asyncio.sleep(2)
            print("回答完毕")

    #小米是否正在播报
    async def get_if_xiaoai_is_playing(self):
        #此函数没有被找到
        playing_info = await self.mina_service.player_get_status(self.device_id)
        # WTF xiaomi api
        is_playing = (
            json.loads(playing_info.get("data", {}).get("info", "{}")).get("status", -1)
            == 1
        )
        return is_playing

    async def run_forever(self):
        print(f"Running xiaogpt now, 用`{'/'.join(KEY_WORD)}`开头来提问")
        async with ClientSession() as session:
            await self.init_all_data(session)
            print("开始循环")
            while 1:
                if self.verbose:
                    print(
                        f"Now listening xiaoai new message timestamp: {self.last_timestamp}"
                    )
                try:
                    r = await self.get_latest_ask_from_xiaoai()
                except Exception:
                    # we try to init all again
                    await self.init_all_dat(session)
                    r = await self.get_latest_ask_from_xiaoai()
                # spider rule
                if not self.mute_xiaoai:
                    await asyncio.sleep(1)
                else:
                    await asyncio.sleep(0.3)
               
                new_timestamp, last_record = self.get_last_timestamp_and_record(r)
                print(new_timestamp, last_record)
                if new_timestamp > self.last_timestamp:
                    self.last_timestamp = new_timestamp
                    query = last_record.get("query", "")
                    if query.startswith(tuple(self.key_word)):
                        # only mute when your clause start's with the keyword
                        self.this_mute_xiaoai = False
                        # drop 帮我回答
                        query = re.sub(rf"^({'|'.join(self.key_word)})", "", query)
                        print("-" * 20)
                        print("问题:" + query + "?")
                        query = f"{query},{PROMPT}"
                        # waiting for xiaoai speaker done
                        if not self.mute_xiaoai:
                            await asyncio.sleep(2)
                        for i in range(8):
                            if not await self.get_if_xiaoai_is_playing():
                                print("小米结束回答")
                                break
                            else:
                                print("小米正在回答")
                                await asyncio.sleep(2)
                        
                        await self.do_tts("正在问GPT请耐心等待")
                        await asyncio.sleep(0.5)
                        try:
                            print(
                                "以下是小爱的回答: ",
                                last_record.get("answers")[0]
                                .get("tts", {})
                                .get("text"),
                            )
                        except:
                            print("小爱没回")
                        # message = await self.ask_gpt(query)
                        message="以下是GPT的回答 "
                        if "清除消息" in query:
                            message="GPT 清除历史消息"
                            WXTChatBot.clear()
                        else:
                            message+=WXTChatBot.ask({"msg":query})
                      
                        # tts to xiaoai with ChatGPT answer
                        print("以下是GPT的回答: " + message)
                        await self.do_tts(message, wait_for_finish=True)
                        if self.mute_xiaoai:
                            self.this_mute_xiaoai = True
                else:
                    if self.verbose:
                        print("No new xiao ai record")                        
if __name__=="__main__":
    app=MiGPT("型号","你的ID","你的密码","")
    asyncio.run(app.run_forever())

这个代码中需要非常注意的代码时

message+=WXTChatBot.ask({"msg":query})

WXTChatBot 这个模块是我封装的访问chatgpt的代码,请按照下面的介绍封装一个你的模块。

wxtchatbot模块

import   requests
import  json
import time
class WXTChatBot():
    baseurl="http://我的服务器ID:13456/"
    @classmethod
    def ask(cls,msg):
        '''{"msg":"请做一首欧阳修风格的16字绝句"}'''
        t1=time.time()
        msg=json.dumps(msg,ensure_ascii=False)
        req = requests.post(WXTChatBot.baseurl+"ask",headers={"content-type":"application/json;charset=UTF-8"}, data=msg.encode('utf-8'),timeout=20)
        print("用时:",(time.time()-t1),'s')
        return req.content.decode("utf8")
    @classmethod
    def clear(cls):
        '''清空历史聊天'''
        req = requests.get(WXTChatBot.baseurl+"clearchat",timeout=3)

这里说明一下我的方式,由于我无法直接调用chatgpt api,所以我在一台云端服务器(可以直接访问chatgpt api),搭建了一个http服务,端口为13456,在服务器的ask接口中我使用openai提供的chatgpt例子调用了chatgpt,然后将内容返回给我。如果你能直接访问openai服务器,可以在你的主机上直接访问ChatGPT api

辅助模块utils

import re
from http.cookies import SimpleCookie
from requests.utils import cookiejar_from_dict
def parse_cookie_string(cookie_string):
    cookie = SimpleCookie()
    cookie.load(cookie_string)
    cookies_dict = {}
    cookiejar = None
    for k, m in cookie.items():
        cookies_dict[k] = m.value
        cookiejar = cookiejar_from_dict(cookies_dict, cookiejar=None, overwrite=True)
    return cookiejar
_no_elapse_chars = re.compile(r"([「」『』《》“”'\"()()]|(?<!-)-(?!-))", re.UNICODE)
def calculate_tts_elapse(text):
    # for simplicity, we use a fixed speed
    speed = 4.25  # this value is picked by trial and error
    # Exclude quotes and brackets that do not affect the total elapsed time
    return len(_no_elapse_chars.sub("", text)) / speed

这个模块主要是计算文字播放的时间和解析小米的token

模块config

LATEST_ASK_API = "https://userprofile.mina.mi.com/device_profile/v2/conversation?source=dialogu&hardware={hardware}×tamp={timestamp}&limit=2"
COOKIE_TEMPLATE = "deviceId={device_id}; serviceToken={service_token}; userId={user_id}"
HARDWARE_COMMAND_DICT = {
    "LX06": "5-1",
    "L05B": "5-3",
    "S12A": "5-1",
    "LX01": "5-1",
    "L06A": "5-1",
    "LX04": "5-1",
    "L05C": "5-3",
    "L17A": "7-3",
    "X08E": "7-3",
    "LX05A": "5-1",  
    "LX5A": "5-1",
    "L15A": "7-3",  
    # add more here
}
MI_USER = ""
MI_PASS = ""
OPENAI_API_KEY = ""
KEY_WORD = ["帮我", "请回答"]

这里需要注意的是,在这个模块中我添加了一个"L15A": "7-3" ,其中L15A 是我的音箱型号,"7-3" 是miservice库需要使用的文字转音频tts的命令,在我上面的介绍中介绍了你如何获取到这个命令码。

ChatGPT API

使用如下的代码作为你访问chapgpt的函数,函数中你需要使用你自己的openai_key,openai_key是在openai官网获得的,每个用户有18美元的免费额度。

import openai
class ChatGPTBot():
    def __init__(self, session, openai_key, api_base=None):
        self.session = session
        self.history = []
        self.api_base = api_base
        self.openai_key = openai_key
    async def ask(self, query):
        openai.api_key = self.openai_key
        if self.api_base:
            openai.api_base = self.api_base
        ms = []
        for h in self.history:
            ms.append({"role": "user", "content": h[0]})
            ms.append({"role": "assistant", "content": h[1]})
        ms.append({"role": "user", "content": f"{query}"})
        completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=ms)
        message = (
            completion["choices"][0]
            .get("message")
            .get("content")
            .encode("utf8")
            .decode()
        )
        self.history.append([f"{query}", message])
        return message

代码的主要逻辑是:

1 init_all_data 登陆智能音箱API,根据你的ID,密码获取返回的token

2 根据你的硬件型号找到你的机器

_init_data_hardware  这个函数用来根据我的型号L15A找到了我的智能音箱

3 使用do_tts函数使用智能音箱的文本转语音

4 使用get_if_xiaoai_is_playing这个函数判断小米音箱自己的回答是否结束了

5 调用chatgpt api 获取回答

6 通过米家tts接口让智能音箱回答。

7 在config.py中定义了gpt问话的头,这里定义为”请回答“,所以当你唤醒音箱后,使用”请回答“+你的问题,会触发ChatGPT的回答,其他的像播放音乐啊不会触发GPT回答

入口代码

if __name__=="__main__":
    app=MiGPT("型号","你的ID","你的密码","")
    asyncio.run(app.run_forever())

入口代码传入你的型号,你的ID,你的密码即可

如果无法访问到ChatGPT的服务器

这时你需要要么有一台自己的外网服务器,在这个服务器上搭建一个你的服务,来访问chatgpt的api,就像我做的这样,我定义的WXTChatBot模块,实际上是用post http请求了我的云端服务器的rest api,云端服务器将请求转发到chatgpt获取回答。

如果你没有自己的外网服务器的话,目前国内有很多用户开放了非常多的免费体验的套壳API,你可以使用他们的api封装一下免费使用。我体验过使用了多个免费api效果很不错,可以在头条里搜索几个。

总结:

借助于米家智能音箱的开放,我们一起DIY试试吧

© 版权声明

相关文章