赞
踩
自从chatgpt火了之后,一直想把智障的天猫精灵对接个chatgpt,奈何天猫精灵不提供接口,终于在github上找到大佬做的小爱同学对接chatgpt的教程,于是根据他的原理,也搞出了个简陋的版本
1.获取天猫精灵的对话记录,卡了我好几天,采取的frida远程调用,参数都对,就检验不通过,原来是转义字符的问题,还有没注意到变量重复赋值。
2.天猫的接口文档自我感觉也是一套糊涂,写的很粗糙。可能我能力不够吧。
3.chatgpt用的都是镜像站,一不注意,免费的就收费了,引流太严重啊。
- # -*- coding: utf-8 -*-
- # This file is auto-generated, don't edit it. Thanks.
- import asyncio
- import sys
-
- from alibabacloud_aligenieiap_1_0.client import Client as AliGenieiap_1_0Client
- from alibabacloud_aligenieiap_1_0.models import PushNotificationsRequestNotificationUnicastRequest, \
- PushNotificationsRequestTenantInfo, PushNotificationsRequestNotificationUnicastRequestSendTarget
- from alibabacloud_tea_openapi import models as open_api_models
- from alibabacloud_aligenieiap_1_0 import models as ali_genieiap__1__0_models
- import requests
- import json
- from urllib import parse
- import frida
- import time
- import datetime
-
- last_timestamp = int(time.time())
-
-
- class Aligenie:
-
- @staticmethod
- def create_client(
- access_key_id: str,
- access_key_secret: str,
- ) -> AliGenieiap_1_0Client:
- """
- 使用AK&SK初始化账号Client
- @param access_key_id:
- @param access_key_secret:
- @return: Client
- @throws Exception
- """
- config = open_api_models.Config(
- # 您的AccessKey ID,
- access_key_id=access_key_id,
- # 您的AccessKey Secret,
- access_key_secret=access_key_secret
- )
- # 访问的域名
- config.endpoint = 'openapi.aligenie.com'
- return AliGenieiap_1_0Client(config)
-
- @staticmethod
- def push_message(content):
- client = Aligenie.create_client('access_key_id', 'access_key_secret')
- notificationUnicastRequest = PushNotificationsRequestNotificationUnicastRequest()
- notificationUnicastRequest.encode_key = "98223"
- notificationUnicastRequest.encode_type = "SKILL_ID"
- notificationUnicastRequest.organization_id = "1398082853812549793"
- place_holder = {'content': content}
- notificationUnicastRequest.place_holder = place_holder
- notificationUnicastRequest.message_template_id = "2DPRcMi97Qiwofsj"
- notificationUnicastRequest.is_debug = True
- requestSendTarget = PushNotificationsRequestNotificationUnicastRequestSendTarget()
- requestSendTarget.target_type = 'DEVICE_OPEN_ID'
- requestSendTarget.target_identity = 'JqyRCF91nuWRCTdk5dbOKVjRWMRBzL2jjI+xkIG3U2gLVoH5nvRw9w=='
- notificationUnicastRequest.send_target = requestSendTarget
- requestTenantInfo = PushNotificationsRequestTenantInfo()
- push_notifications_request = ali_genieiap__1__0_models.PushNotificationsRequest(notificationUnicastRequest,
- requestTenantInfo)
- try:
- response = client.push_notifications(push_notifications_request)
- print(response)
- except Exception as err:
- print(err)
-
-
- class Chat:
-
- @staticmethod
- def one(content):
- #换成自己的chatgpt或者自己找免费的接口
- pass
-
- @staticmethod
- def two(content):
- #换成自己的chatgpt或者自己找免费的接口
- pass
-
- class AliGenieFrida:
- def __init__(self):
- self.last_timestamp = int(time.time())
-
- def get_frida_rpc_script(self):
- js = open('tmjl.js', 'r', encoding='utf-8').read()
-
- session = frida.get_usb_device().attach("com.alibaba.ailabs.tg")
-
- script = session.create_script(js)
- script.load()
- return script
-
- def get_data(self):
- for _ in range(2):
- try:
- #换成抓包的data
- data = '{"limit":"10","uuid":"uuid","deviceInfo":"{' \
- '\\"bizGroup\\":\\"X1\\",' \
- '\\"bizType\\":\\"AILABS\\",\\"botId\\":10,' \
- '\\"uuid\\":\\"uuid\\"}",' \
- '"authInfo":"{\\"accessToken\\":\\"2zyAfPN+bJdMzh5dUtiWqDtLLL/KnlYJCjsMujjz' \
- '+xYeTwqhbS2JvHGWAg7K' \
- '+nTyip' \
- '/vBXf83wZXe+wBdmHUWqs8OhxFYToW\\",\\"deviceIds\\":[],\\"isAuthenticated\\":false,' \
- '\\"isNew\\":true,' \
- '\\"isTempUser\\":false,\\"userId\\":\\"userId\\",' \
- '\\"utdId\\":\\"ZGOM/fF/pVkDADEloBB6B5tx\\"}"}'
- api = 'mtop.alibaba.aicloud.index.listsentences'
- appKey = '23904773'
-
-
- timestamp = str(int(time.time()))
- res = json.loads(self.get_frida_rpc_script().exports.callgetsign(data, timestamp, api, appKey))
- res['timestamp'] = timestamp
- res['x-sgext'] = parse.quote(res['x-sgext'])
- res['x-mini-wua'] = parse.quote(res['x-mini-wua'])
- res['x-sign'] = parse.quote(res['x-sign'])
- res['x-t'] = parse.quote(res['timestamp'])
- requests.get("http://43.142.90.87:8610/rs?key=sign&vaule=" + json.dumps(res) + "&time_out=100")
- # print("调用frida获取", res)
-
- headers = {
- 'x-sid': 'sid',
- 'x-uid': 'uid',
- 'x-nettype': 'WIFI',
- 'x-pv': '6.3',
- 'x-nq': 'WIFI',
- 'x-features': '27',
- 'x-app-conf-v': '0',
- 'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
- 'cache-control': 'no-cache',
- 'x-t': '1684334633',
- 'x-bx-version': '6.5.24',
- 'f-refer': 'mtop',
- 'x-extdata': 'openappkey%3DDEFAULT_AUTH',
- 'x-ttid': '702008%40genieapp_android_6.0.1',
- 'x-app-ver': '6.0.1',
- 'x-c-traceid': 'ZGOM%2FfF%2FpVkDADEloBB6B5tx1684334633416073015885',
- 'x-umt': 'GI8Bdk5LPPgZIwKIJIE1O6HP%2F11f01e2',
- 'x-utdid': 'ZGOM%2FfF%2FpVkDADEloBB6B5tx',
- 'x-appkey': '23904773',
- 'x-devid': 'Amqv8wxkiFMLQwGHcw6DOrRUeg7B3s693f8iah0kXC6E',
- 'user-agent': 'MTOPSDK%2F3.1.1.7+%28Android%3B8.1.0%3BGoogle%3BPixel%29',
- 'Host': 'acs.m.taobao.com',
- }
- headers['x-sgext'] = res['x-sgext']
- headers['x-mini-wua'] = res['x-mini-wua']
- headers['x-sign'] = res['x-sign']
- headers['x-t'] = res['timestamp']
-
- params = {
- 'data': data,
- }
- response = requests.get(
- 'https://acs.m.taobao.com/gw/mtop.alibaba.aicloud.index.listsentences/1.0/',
- params=params,
- headers=headers,
- ).json()
-
- # print(response)
- for _ in response['data']['model']:
- if _['query'] != '' and _['cateName'] == '百科':
- return _
- return response['data']['model'][0]
- except Exception as e:
- print(e)
-
-
- async def print_every_two_seconds():
- while True:
- aliGenieFrida = AliGenieFrida()
- qa = aliGenieFrida.get_data()
- query = qa['query']
- reply = qa['reply']
- queryTime = qa['queryTime']
- last = int(time.mktime(datetime.datetime.strptime(queryTime, '%Y-%m-%d %H:%M:%S.%f').timetuple()))
- global last_timestamp
- print(last_timestamp, query, last)
- if query != '' and last > last_timestamp:
- last_timestamp = last
- content = Chat.two(query)
- print(f"问题:{query},天猫精灵的回答:{reply},chat的回答:{content}")
- Aligenie.push_message(content)
- await asyncio.sleep(2)
-
-
- async def main():
- task = asyncio.create_task(print_every_two_seconds())
- await task
-
-
- if __name__ == '__main__':
- asyncio.run(main())
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。