赞
踩
1 分钟能做什么?集成 ChatGPT 到自己的公众号,小程序或者 APP?集成各种形式的 Stable Diffusion,让 AIGC 帮助自己的项目更有趣,更生动?本教程将会教大家如何 1 分钟高效集成 ChatGPT,Stable Diffusion 等 AIGC 模型到自己的项目中。(文末有视频和代码案例)
1. 注册并登录 https://aigcaas.cn网站
2. 点击左侧 API Explorer,并选择分类“聊天机器人”,选择应用 ChatGPT,并选择Chat Completion:
3. 在中间,按照提示的格式,输入对应的参数信息,例如:[{"role":"user","content":"你好"}],并点击右侧的在线调用→发送请求按钮:
稍等片刻,即可看到接口完成调用,已经获得请求结果。如果想要进行流式响应,可以在流式响应输入框中输入stream,例如:
4. 点击右侧的案例代码,根据自己需要获得对应的案例代码:
5. 此时,可以将代码复制到本地,对代码的第12行-13行进行修改:
点击页面左侧密钥列表→新建,创建密钥信息:
将密钥信息对应复制到代码12-13行:
6. 完成粘贴之后,即可进行代码的测试,例如执行代码(可能涉及到一些依赖的安装,不同语言安装依赖方式略有不同,可以自行安装依赖):
完成调试之后,即可和自己的项目做集成。当然也可以根据自己的需求对参数内容进行相对应的优化和调整。
Stable Diffusion 是目前非常火热的文生图工具,根据文字即可生成图片, AIGCaaS 平台拥有数十款 Stable Diffusion 应用,例如以哈士奇作为关键词,进行不同类型,不同风格的图像生成,效果如下:
接口获取的方式与 ChatGPT 基本一致:
1. 点击左侧 API Explorer,并选择分类“图像生成”,选择一个自己需要的场景,例如“Open Journey”:
2. 输入提示词,例如:1girl, white hair, golden eyes, beautiful eyes, detail, flower meadow, cumulonimbus clouds, lighting, detailed sky, garden,并点击发送请求按钮:
多次生成,超看效果:
当然,不同模型的表现可能不同,例如二次元生成的效果:
3. 完成图片的生成,可以点击右侧的案例代码,根据自己需要获得对应的案例代码:
4. 同样要对13-14行代码进行密钥信息的替换,替换完成即可在本地进行代码的运行:此时,通过返回的链接地址,下载数据即可:
以 Python 语言为例,通过 Flask 框架作为后端框架:
- # coding:utf-8
- import os
- import time
- import json
- import redis
- import random
- import base64
- import hashlib
- import logging
- import requests
- import xmltodict
- import urllib.parse
- import urllib.request
- from flask import Flask, request, abort, Response, render_template_string
-
- # 获取随机字符串
- random_str = lambda count=100: "".join(random.sample('zyxwvutsrqponmlkjihgUIOPLKJHGFDSAZXCVBNM' * 10, count))
-
- # 日志配置
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
-
- # 常量
- WECHAT_TOKEN = "公众号 Token"
- WECHAT_APPID = "公众号 APPID"
- WECHAT_APPSECRET = "公众号 APP Secret"
- WECHAT_ENCODINGAESKEY = '公众号 Encoding AES Key'
- secret_id = 'AIGCaaS 平台密钥' # 密钥信息
- secret_key = 'AIGCaaS 平台密钥' # 密钥信息
-
- db_config = {
- "redis": {
- "server": "192.168.211.1",
- "port": 6379,
- }
- }
-
- app = Flask(__name__)
-
- redis_conn = redis.Redis(host=db_config["redis"]["server"], port=db_config["redis"]["port"])
-
- # 签名对象
- getSha256 = lambda content: hashlib.sha256(content.encode("utf-8")).hexdigest()
-
- getTextResponse = lambda xml_dict, response_data: {
- "xml":
- {
- "ToUserName": xml_dict.get("FromUserName"),
- "FromUserName": xml_dict.get("ToUserName"),
- "CreateTime": int(time.time()),
- "MsgType": "text",
- "Content": response_data
- }
- }
-
-
- def chatgpt(user, content):
- try:
- messages = redis_conn.get('user-%s' % (user))
- messages = json.loads(messages.decode("utf-8")) if messages else []
- messages.append({"role": "user", "content": content})
- redis_conn.setex('user-%s' % (user), 60 * 60 * 24 * 7, json.dumps(messages))
- url = "https://api.aigcaas.cn/product/%s/api/%s" % ('chatgpt_chat', 'chat_com')
- # 构建请求头
- nonce = str(random.randint(1, 10000))
- timestamp = str(int(time.time()))
- headers = {
- 'SecretID': secret_id,
- 'Nonce': nonce,
- 'Token': getSha256(("%s%s%s" % (timestamp, secret_key, nonce))),
- 'Timestamp': timestamp,
- 'Content-Type': 'application/json'
- }
- # 构建请求 body
- data = {"messages": messages}
- # 获取响应
- response = requests.request("POST", url, headers=headers, data=json.dumps(data))
- result = json.loads(response.text)
- logger.debug("result: %s" % result)
- if result.get("status") == "Error":
- redis_conn.delete('user-%s' % (user))
- redis_conn.setex('user-last-text-message-%s' % user, 600, "系统检测到敏感信息,已经屏蔽,请重试")
- else:
- messages.append(result["choices"][0]["message"])
- messages = messages[-10:]
- redis_conn.setex('user-%s' % (user), 60 * 60 * 24, json.dumps(messages))
- return result["choices"][0]["message"]["content"]
- except Exception as e:
- logger.error(e)
- return e
-
-
- @app.route("/mp", methods=["GET", "POST"])
- def wechat():
- # 接收微信服务器发送参数
- signature = request.args.get("signature")
- timestamp = request.args.get("timestamp")
- nonce = request.args.get("nonce")
- logger.debug([signature, timestamp, nonce])
-
- if not all([signature, timestamp, nonce]):
- abort(400)
-
- li = [WECHAT_TOKEN, timestamp, nonce]
- li.sort()
- tmp_str = "".join(li)
- sign = hashlib.sha1(tmp_str.encode("utf-8")).hexdigest() # 进行sha1加密, 得到正确的签名值
-
- # 将自己计算的签名值, 与请求的签名参数进行对比, 如果相同, 则证明请求来自微信
- if signature != sign:
- abort(403)
-
- if request.method == "GET":
- return request.args.get("echostr")
-
- xml_str = request.data
-
- if not xml_str:
- abort(400)
-
- # 对xml字符串进行解析成字典
- xml_dict = xmltodict.parse(xml_str)
- xml_dict = xml_dict.get("xml")
- logger.debug('xml_str: %s' % xml_str)
- logger.debug('xml_dict: %s' % xml_dict)
- user = xml_dict.get("FromUserName")
-
- # 新的信息处理
- resp_dict = getTextResponse(xml_dict, "感谢您关注 AIGC Hub")
- if xml_dict.get("MsgType") == "text":
- response_data = chatgpt(user, xml_dict.get("Content"))
- resp_dict = getTextResponse(xml_dict, response_data)
- resp_xml_str = xmltodict.unparse(resp_dict)
- logger.debug(resp_xml_str)
- return resp_xml_str
-
-
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=8000, debug=True, threaded=True)
整个项目非常简单:服务端接收到请求,将文字信息提取出来xml_dict.get("Content")
,并通过chatgpt方法进行结果的获取,并将结果返回给用户。但是这里遇到一个比较尴尬的事情,因为 AIGC 模型往往运行时间会比较长,可能超过微信公众号的 5 秒钟要求,所以此处可以进行额外的处理:
1. 服务端接收到请求,将文字信息提取出来xml_dict.get("Content")
2. 异步调用chatgpt
方法,如果 5 秒钟没有响应,则返回一个友好的提示:系统正在进行处理,请稍后回复"继续"查看结果,如果获得到了结果,则进行结果的返回
3. 如果没有拿到结果,用户稍后发送继续时,系统将结果返回给客户端
整体的实现逻辑为:
- # coding:utf-8
- import os
- import time
- import json
- import redis
- import random
- import base64
- import hashlib
- import logging
- import requests
- import xmltodict
- import urllib.parse
- import urllib.request
- from flask import Flask, request, abort, Response, render_template_string
- from concurrent.futures import ThreadPoolExecutor
-
- # 创建线程池执行器
- executor = ThreadPoolExecutor(2)
-
- # 获取随机字符串
- random_str = lambda count=100: "".join(random.sample('zyxwvutsrqponmlkjihgUIOPLKJHGFDSAZXCVBNM' * 10, count))
-
- # 日志配置
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
-
- # 常量
- WECHAT_TOKEN = "公众号 Token"
- WECHAT_APPID = "公众号 APPID"
- WECHAT_APPSECRET = "公众号 APP Secret"
- WECHAT_ENCODINGAESKEY = '公众号 Encoding AES Key'
- secret_id = 'AIGCaaS 平台密钥' # 密钥信息
- secret_key = 'AIGCaaS 平台密钥' # 密钥信息
-
- db_config = {
- "redis": {
- "server": "192.168.211.1",
- "port": 6379,
- }
- }
-
- app = Flask(__name__)
-
- redis_conn = redis.Redis(host=db_config["redis"]["server"], port=db_config["redis"]["port"])
-
- # 签名对象
- getSha256 = lambda content: hashlib.sha256(content.encode("utf-8")).hexdigest()
-
- getTextResponse = lambda xml_dict, response_data: {
- "xml":
- {
- "ToUserName": xml_dict.get("FromUserName"),
- "FromUserName": xml_dict.get("ToUserName"),
- "CreateTime": int(time.time()),
- "MsgType": "text",
- "Content": response_data
- }
- }
-
-
- def chatgpt(user, content):
- try:
- messages = redis_conn.get('user-%s' % (user))
- messages = json.loads(messages.decode("utf-8")) if messages else []
- messages.append({"role": "user", "content": content})
- redis_conn.setex('user-%s' % (user), 60 * 60 * 24 * 7, json.dumps(messages))
- url = "https://api.aigcaas.cn/product/%s/api/%s" % ('chatgpt_chat', 'chat_com')
- # 构建请求头
- nonce = str(random.randint(1, 10000))
- timestamp = str(int(time.time()))
- headers = {
- 'SecretID': secret_id,
- 'Nonce': nonce,
- 'Token': getSha256(("%s%s%s" % (timestamp, secret_key, nonce))),
- 'Timestamp': timestamp,
- 'Content-Type': 'application/json'
- }
- # 构建请求 body
- data = {"messages": messages}
- # 获取响应
- response = requests.request("POST", url, headers=headers, data=json.dumps(data))
- result = json.loads(response.text)
- logger.debug("result: %s" % result)
- if result.get("status") == "Error":
- redis_conn.delete('user-%s' % (user))
- redis_conn.setex('user-last-text-message-%s' % user, 600, "系统检测到敏感信息,已经屏蔽,请重试")
- else:
- messages.append(result["choices"][0]["message"])
- messages = messages[-10:]
- redis_conn.setex('user-%s' % (user), 60 * 60 * 24, json.dumps(messages))
- redis_conn.setex('user-last-text-message-%s' % user, 600, result["choices"][0]["message"]["content"])
- except Exception as e:
- logger.error(e)
- redis_conn.delete('user-%s' % (user))
-
-
- def get_access_token():
- url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s' % (
- WECHAT_APPID, WECHAT_APPSECRET)
- response_data = urllib.request.urlopen(url).read().decode("utf-8")
- logger.debug("response_data: %s" % response_data)
- return json.loads(response_data)["access_token"]
-
-
- @app.route("/mp", methods=["GET", "POST"])
- def wechat():
- # 接收微信服务器发送参数
- signature = request.args.get("signature")
- timestamp = request.args.get("timestamp")
- nonce = request.args.get("nonce")
- logger.debug([signature, timestamp, nonce])
-
- if not all([signature, timestamp, nonce]):
- abort(400)
-
- li = [WECHAT_TOKEN, timestamp, nonce]
- li.sort()
- tmp_str = "".join(li)
- sign = hashlib.sha1(tmp_str.encode("utf-8")).hexdigest() # 进行sha1加密, 得到正确的签名值
-
- # 将自己计算的签名值, 与请求的签名参数进行对比, 如果相同, 则证明请求来自微信
- if signature != sign:
- abort(403)
-
- if request.method == "GET":
- return request.args.get("echostr")
-
- xml_str = request.data
-
- if not xml_str:
- abort(400)
-
- # 对xml字符串进行解析成字典
- xml_dict = xmltodict.parse(xml_str)
- xml_dict = xml_dict.get("xml")
- logger.debug('xml_str: %s' % xml_str)
- logger.debug('xml_dict: %s' % xml_dict)
- user = xml_dict.get("FromUserName")
-
- # 对超时信息进行处理
- resp_dict = None
- if xml_dict.get("MsgType") == "text" and '继续' == xml_dict.get("Content").strip():
- resp_dict = getTextResponse(xml_dict, '系统还在处理中,烦请再等一下发送"继续"查看结果')
- text_status = redis_conn.get('user-last-text-message-status-%s' % user)
- if text_status:
- response_data = redis_conn.get('user-last-text-message-%s' % user)
- if response_data:
- redis_conn.delete('user-last-text-message-status-%s' % user)
- redis_conn.delete('user-last-text-message-%s' % user)
- resp_dict = getTextResponse(xml_dict, response_data.decode("utf-8"))
- else:
- resp_dict = None
-
- # 新的信息处理
- if not resp_dict:
- redis_conn.delete('user-last-text-message-status-%s' % user)
- redis_conn.delete('user-last-text-message-%s' % user)
- # 配置默认信息
- resp_dict = getTextResponse(xml_dict, "感谢您关注 AIGC Hub")
- if xml_dict.get("MsgType") == "text":
- redis_conn.setex('user-last-text-message-status-%s' % user, 600, '1')
- executor.submit(chatgpt, user, xml_dict.get("Content"))
- response_data = '系统正在进行处理,请稍后回复"继续"查看结果'
- for i in range(1, 23):
- time.sleep(0.2)
- temp_response_data = redis_conn.get('user-last-text-message-%s' % user)
- if temp_response_data:
- response_data = temp_response_data.decode("utf-8")
- redis_conn.delete('user-last-text-message-status-%s' % user)
- break
- resp_dict = getTextResponse(xml_dict, response_data)
- resp_xml_str = xmltodict.unparse(resp_dict)
- logger.debug(resp_xml_str)
- return resp_xml_str
-
-
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=8000, debug=True, threaded=True)
上面为大家介绍了如何用 1 分钟获取多种形式的 AIGC 应用,包括不限于 ChatGPT,Stable Diffusion 等,并通过与微信公众号集成的方式,为大家进行了简单的代码分享。当然,除了与公众号的集成还可以与小程序的集成,还可以与其他更多种形式的应用进行集成,AIGCaaS 平台系统为开发者提供开箱即用的 AIGC 应用 API,可以让开发者更简单,更方便,更快速的使用多种形式的 AIGC 应用。
常见问题:
1. 为什么我生成的 Stable Diffusion 图片是黑色的?在某些时候,系统进行了敏感检测,如果生成的图片存在敏感信息,系统会自动进行屏蔽;
2. 这个 AIGCaaS 项目收费么?AIGCaaS 为开发者提供了大量的低价 API,注册即赠送大量的试用与体验机会(以 ChatGPT 为例,可以体验大概2500次),即便超过免费体验此时,开发者也可以申请永久免费计划,进行长期的“薅羊毛”;
3. 使用过程中出现问题,如何联系技术支持?技术支持微信号:
envless
,有任何问题随时联系;
AIGC 爱好者交流群
------
我们创建了一个高质量的技术交流群,与优秀的人在一起,自己也会优秀起来,赶紧点击加群,享受一起成长的快乐。另外,如果你最近想跳槽的话,年前我花了2周时间收集了一波大厂面经,节后准备跳槽的可以点击这里领取!
··································
你好,我是程序猿DD,10年开发老司机、阿里云MVP、腾讯云TVP、出过书创过业、国企4年互联网6年。从普通开发到架构师、再到合伙人。一路过来,给我最深的感受就是一定要不断学习并关注前沿。只要你能坚持下来,多思考、少抱怨、勤动手,就很容易实现弯道超车!所以,不要问我现在干什么是否来得及。如果你看好一个事情,一定是坚持了才能看到希望,而不是看到希望才去坚持。相信我,只要坚持下来,你一定比现在更好!如果你还没什么方向,可以先关注我,这里会经常分享一些前沿资讯,帮你积累弯道超车的资本。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。