当前位置:   article > 正文

Github Trending微信推送机器人——metagpt学习-OSS订阅智能体

Github Trending微信推送机器人——metagpt学习-OSS订阅智能体

引言

博客背景:个人metagpt智能体开发第四章学习笔记

本文将使用的技术和方法:

  • Python 工具

  • 第三方公众号作为消息推送的功能,如server酱、wxpusher、Pushplus等,本文选择wxpusher,并获取uid和token

  • 爬虫相关概念,不会的可以问chatgpt

正文

oss订阅智能体

1.OSSWatcher Role 实现

在实现OSSWatcher的Role之前,首先需要明确我们希望OSSWatcher执行哪些任务,即需要实现哪些Action。考虑到我们的目标是分析热门开源项目,因此需要先获取热门开源项目的信息。基于这一需求,我们可以将OSSWatcher拆分为两个Action:一是爬取热门开源项目,二是分析热门开源项目。

1.1 GitHub Trending爬取

  • 功能:爬取当天不分国家语言和编程语言的热门仓库进行分析。
  • 方法:直接爬取网页内容,可以加入筛选条件。 1.2 GitHub Trending总结
  • 功能:分析爬取到的热门开源项目信息,并进行总结。
  • 实现:LLM根据提示词进行分析,输出指定格式的结果。 1.3 OSSWatcher Role实现
  • 方法:将以上两个Action写入metagpt/actions/oss_trending.py文件,并创建metagpt/roles/oss_watcher.py文件编写Role代码。

实现代码:

  1. # Role实现
  2. class OssWatcher(Role):
  3. name: str = "Codey",
  4. profile: str = "OssWatcher",
  5. goal: str = "Generate an insightful GitHub Trending analysis report.",
  6. constraints: str = "Only analyze based on the provided GitHub Trending data.",
  7. def __init__(self, **kwargs) -> None:
  8. super().__init__(**kwargs)
  9. self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
  10. self._set_react_mode(react_mode="by_order")
  11. async def _act(self) -> Message:
  12. logger.info(f"{self._setting}: ready to {self._rc.todo}")
  13. # By choosing the Action by order under the hood
  14. # todo will be first SimpleWriteCode() then SimpleRunCode()
  15. todo = self._rc.todo
  16. msg = self.get_memories(k=1)[0] # find the most k recent messages
  17. result = await todo.run(msg.content)
  18. msg = Message(content=str(result))
  19. # msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
  20. self._rc.memory.add(msg)
  21. return msg

Trigger实现

  • 触发方式:最简单的是定时触发,常用的实现方式是使用crontab,也可使用Python异步库aiocron。
  • 实现:使用函数方式和类方式结合aiocron实现定时Trigger。

实现代码:

  1. # Trigger
  2. class OssInfo(BaseModel):
  3. url: str
  4. timestamp: float = Field(default_factory=time.time)
  5. class GithubTrendingCronTrigger():
  6. def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
  7. self.crontab = crontab(spec, tz=tz)
  8. self.url = url
  9. def __aiter__(self):
  10. return self
  11. async def __anext__(self):
  12. await self.crontab.next()
  13. logger.info(self.url)
  14. logger.info(OssInfo(url=self.url))
  15. logger.info(Message(self.url))
  16. return Message(self.url)
  17. # return Message(self.url, OssInfo(url=self.url))

Callback设计

  • 功能:定义处理智能体生成的信息。
  • 实现:发送信息到日常使用的应用,如weixin

其中我们使用的是wxpusher,需要WXPUSHER_TOKEN和WXPUSHER_UIDS

WXPUSHER_TOKEN即wxpush的APP_TOKEN,参考官方文档获取appToken

WXPUSHER_UIDS可以从应用管理页的”用户管理->用户列表“获取用户的UID,如果要发送给多个用户,可以用逗号将不同用户UID隔开

实现代码:

  1. # callback
  2. class WxPusherClient:
  3. def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
  4. self.base_url = base_url
  5. self.token = token or WXPUSHER_TOKEN
  6. # self.token = token or os.environ["WXPUSHER_TOKEN"]
  7. async def send_message(
  8. self,
  9. content,
  10. summary: Optional[str] = None,
  11. content_type: int = 1,
  12. topic_ids: Optional[list[int]] = None,
  13. uids: Optional[list[int]] = None,
  14. verify: bool = False,
  15. url: Optional[str] = None,
  16. ):
  17. payload = {
  18. "appToken": self.token,
  19. "content": content,
  20. "summary": summary,
  21. "contentType": content_type,
  22. "topicIds": topic_ids or [],
  23. "uids": uids or WXPUSHER_UIDS.split(","),
  24. # "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
  25. "verifyPay": verify,
  26. "url": url,
  27. }
  28. url = f"{self.base_url}/api/send/message"
  29. return await self._request("POST", url, json=payload)
  30. async def _request(self, method, url, **kwargs):
  31. async with aiohttp.ClientSession() as session:
  32. async with session.request(method, url, **kwargs, ssl=False) as response:
  33. response.raise_for_status()
  34. return await response.json()
  35. async def wxpusher_callback(msg: Message):
  36. client = WxPusherClient()
  37. await client.send_message(msg.content, content_type=3)

除此之外的必不可少一些小配置:

1.代理配置:由于GitHub为国外网站,可能会遇到网络问题,因为 aiohttp 默认不走系统代理,所以需要做下代理配置,可以通过在config/key.yaml配置文件中,添加自己代理服务器的配置,以解决网络问题:

GLOBAL_PROXY: http://127.0.0.1:8118  # 改成自己的代理服务器地址

通过「控制面板」——》「网络和Internet」——》「Internet选项」——》「连接」——》「局域网设置」,可以看到代理的地址和端口号;切记端口一定要复制成自己的!否则会显示计算机拒绝连接,不要粗心哦

2.设置自己想要的内容:可以修改下面代码中的内容实现自己感兴趣的方向与分析角度:

  1. 编程语言趋势:观察Trending列表中使用的编程语言,了解当前哪些编程语言在开发者社区中更受欢迎

  2. 项目类型和用途:分析Trending列表中的项目,看看它们是属于哪些类别,以及它们的具体用途是什么

  3. 社区活跃度:查看项目的星标数量、贡献者数量

  4. 新兴技术和工具:注意新项目和涌现的技术,以便了解当前的技术趋势

  1. TRENDING_ANALYSIS_PROMPT = """# Requirements
  2. 您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。
  3. # The title about Today's GitHub Trending
  4. ## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
  5. ## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
  6. ## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
  7. ---
  8. # Format Example
  9. ```
  10. # [Title]
  11. ## Today's Trends
  12. Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
  13. The top popular projects are Project1 and Project2.
  14. ## The Trends Categories
  15. 1. Generative AI
  16. - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
  17. - [Project2](https://github/xx/project2): ...
  18. ...
  19. ## Highlights of the List
  20. 1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
  21. ...
  22. ```
  23. ---
  24. # Github Trending
  25. {trending}
  26. """

 3.定时设置:由于作者想要看生成效果所以设置的每分钟发送订阅信息,后续想要保存这个推送机器人可以设置定时推送,方法也很简单、

方法一:可以修改 main 函数中的 spec 参数为适当的 cron 表达式,以便在每天的特定时间触发任务。

  1. #main函数
  2. async def main(spec: str = "0 9 * * *", discord: bool = False, wxpusher: bool = True):
  3. ...

方法二: 基于aiocron,可以用cron语法非常灵活地配置定时规则

  1. from pytz import timezone
  2. beijing_tz = timezone('Asia/Shanghai') 获取北京时间的时区
  3. cron_trigger = GithubTrendingCronTrigger("0 8 * * *", tz=beijing_tz)

最终实现效果:

完整代码:

 

  1. import os
  2. os.environ["ZHIPUAI_API_KEY"] = "此处填写你的"
  3. from metagpt.environment import Environment
  4. import asyncio
  5. import os
  6. import time
  7. from typing import Any, AsyncGenerator, Awaitable, Callable, Optional
  8. import aiohttp
  9. from aiocron import crontab
  10. from bs4 import BeautifulSoup
  11. from pydantic import BaseModel, Field
  12. from pytz import BaseTzInfo
  13. from metagpt.actions.action import Action
  14. from metagpt.config import CONFIG
  15. from metagpt.logs import logger
  16. from metagpt.roles import Role
  17. from metagpt.schema import Message
  18. WXPUSHER_TOKEN = "此处填写你的"
  19. WXPUSHER_UIDS = "此处填写你的"
  20. # 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
  21. class SubscriptionRunner(BaseModel):
  22. """A simple wrapper to manage subscription tasks for different roles using asyncio.
  23. Example:
  24. import asyncio
  25. from metagpt.subscription import SubscriptionRunner
  26. from metagpt.roles import Searcher
  27. from metagpt.schema import Message
  28. async def trigger():
  29. while True:
  30. yield Message("the latest news about OpenAI")
  31. await asyncio.sleep(3600 * 24)
  32. async def callback(msg: Message):
  33. print(msg.content)
  34. async def main():
  35. pb = SubscriptionRunner()
  36. await pb.subscribe(Searcher(), trigger(), callback)
  37. await pb.run()
  38. asyncio.run(main())
  39. """
  40. tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)
  41. class Config:
  42. arbitrary_types_allowed = True
  43. async def subscribe(
  44. self,
  45. role: Role,
  46. trigger: AsyncGenerator[Message, None],
  47. callback: Callable[
  48. [
  49. Message,
  50. ],
  51. Awaitable[None],
  52. ],
  53. ):
  54. """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
  55. Args:
  56. role: The role to subscribe.
  57. trigger: An asynchronous generator that yields Messages to be processed by the role.
  58. callback: An asynchronous function to be called with the response from the role.
  59. """
  60. loop = asyncio.get_running_loop()
  61. async def _start_role():
  62. async for msg in trigger:
  63. logger.info("===log===" * 3)
  64. logger.info(msg)
  65. logger.info(msg.content)
  66. resp = await role.run(msg.content)
  67. await callback(resp)
  68. self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")
  69. async def unsubscribe(self, role: Role):
  70. """Unsubscribes a role from its trigger and cancels the associated task.
  71. Args:
  72. role: The role to unsubscribe.
  73. """
  74. task = self.tasks.pop(role)
  75. task.cancel()
  76. async def run(self, raise_exception: bool = True):
  77. """Runs all subscribed tasks and handles their completion or exception.
  78. Args:
  79. raise_exception: _description_. Defaults to True.
  80. Raises:
  81. task.exception: _description_
  82. """
  83. while True:
  84. for role, task in self.tasks.items():
  85. if task.done():
  86. if task.exception():
  87. if raise_exception:
  88. raise task.exception()
  89. logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
  90. else:
  91. logger.warning(
  92. f"Task {task.get_name()} has completed. "
  93. "If this is unexpected behavior, please check the trigger function."
  94. )
  95. self.tasks.pop(role)
  96. break
  97. else:
  98. await asyncio.sleep(1)
  99. # Actions 的实现
  100. TRENDING_ANALYSIS_PROMPT = """# Requirements
  101. 您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。
  102. # The title about Today's GitHub Trending
  103. ## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
  104. ## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
  105. ## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
  106. ---
  107. # Format Example
  108. ```
  109. # [Title]
  110. ## Today's Trends
  111. Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
  112. The top popular projects are Project1 and Project2.
  113. ## The Trends Categories
  114. 1. Generative AI
  115. - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
  116. - [Project2](https://github/xx/project2): ...
  117. ...
  118. ## Highlights of the List
  119. 1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
  120. ...
  121. ```
  122. ---
  123. # Github Trending
  124. {trending}
  125. """
  126. class CrawlOSSTrending(Action):
  127. async def run(self, url: str = "https://github.com/trending"):
  128. async with aiohttp.ClientSession() as client:
  129. async with client.get(url, proxy=CONFIG.global_proxy, ssl=False) as response:
  130. response.raise_for_status()
  131. html = await response.text()
  132. soup = BeautifulSoup(html, 'html.parser')
  133. repositories = []
  134. for article in soup.select('article.Box-row'):
  135. repo_info = {}
  136. repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
  137. repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()
  138. # Description
  139. description_element = article.select_one('p')
  140. repo_info['description'] = description_element.text.strip() if description_element else None
  141. # Language
  142. language_element = article.select_one('span[itemprop="programmingLanguage"]')
  143. repo_info['language'] = language_element.text.strip() if language_element else None
  144. # Stars and Forks
  145. stars_element = article.select('a.Link--muted')[0]
  146. forks_element = article.select('a.Link--muted')[1]
  147. repo_info['stars'] = stars_element.text.strip()
  148. repo_info['forks'] = forks_element.text.strip()
  149. # Today's Stars
  150. today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
  151. repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None
  152. repositories.append(repo_info)
  153. return repositories
  154. class AnalysisOSSTrending(Action):
  155. async def run(
  156. self,
  157. trending: Any
  158. ):
  159. return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))
  160. # Role实现
  161. class OssWatcher(Role):
  162. name: str = "Codey",
  163. profile: str = "OssWatcher",
  164. goal: str = "Generate an insightful GitHub Trending analysis report.",
  165. constraints: str = "Only analyze based on the provided GitHub Trending data.",
  166. def __init__(self, **kwargs) -> None:
  167. super().__init__(**kwargs)
  168. self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
  169. self._set_react_mode(react_mode="by_order")
  170. async def _act(self) -> Message:
  171. logger.info(f"{self._setting}: ready to {self._rc.todo}")
  172. # By choosing the Action by order under the hood
  173. # todo will be first SimpleWriteCode() then SimpleRunCode()
  174. todo = self._rc.todo
  175. msg = self.get_memories(k=1)[0] # find the most k recent messages
  176. result = await todo.run(msg.content)
  177. msg = Message(content=str(result))
  178. # msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
  179. self._rc.memory.add(msg)
  180. return msg
  181. # Trigger
  182. class OssInfo(BaseModel):
  183. url: str
  184. timestamp: float = Field(default_factory=time.time)
  185. class GithubTrendingCronTrigger():
  186. def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
  187. self.crontab = crontab(spec, tz=tz)
  188. self.url = url
  189. def __aiter__(self):
  190. return self
  191. async def __anext__(self):
  192. await self.crontab.next()
  193. logger.info(self.url)
  194. logger.info(OssInfo(url=self.url))
  195. logger.info(Message(self.url))
  196. return Message(self.url)
  197. # return Message(self.url, OssInfo(url=self.url))
  198. # callback
  199. class WxPusherClient:
  200. def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
  201. self.base_url = base_url
  202. self.token = token or WXPUSHER_TOKEN
  203. # self.token = token or os.environ["WXPUSHER_TOKEN"]
  204. async def send_message(
  205. self,
  206. content,
  207. summary: Optional[str] = None,
  208. content_type: int = 1,
  209. topic_ids: Optional[list[int]] = None,
  210. uids: Optional[list[int]] = None,
  211. verify: bool = False,
  212. url: Optional[str] = None,
  213. ):
  214. payload = {
  215. "appToken": self.token,
  216. "content": content,
  217. "summary": summary,
  218. "contentType": content_type,
  219. "topicIds": topic_ids or [],
  220. "uids": uids or WXPUSHER_UIDS.split(","),
  221. # "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
  222. "verifyPay": verify,
  223. "url": url,
  224. }
  225. url = f"{self.base_url}/api/send/message"
  226. return await self._request("POST", url, json=payload)
  227. async def _request(self, method, url, **kwargs):
  228. async with aiohttp.ClientSession() as session:
  229. async with session.request(method, url, **kwargs, ssl=False) as response:
  230. response.raise_for_status()
  231. return await response.json()
  232. async def wxpusher_callback(msg: Message):
  233. client = WxPusherClient()
  234. await client.send_message(msg.content, content_type=3)
  235. # 运行入口,
  236. async def main(spec: str = "* * * * *", discord: bool = False, wxpusher: bool = True):
  237. callbacks = []
  238. if wxpusher:
  239. callbacks.append(wxpusher_callback)
  240. if not callbacks:
  241. async def _print(msg: Message):
  242. print(msg.content)
  243. callbacks.append(_print)
  244. async def callback(msg):
  245. await asyncio.gather(*(call(msg) for call in callbacks))
  246. runner = SubscriptionRunner()
  247. await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
  248. await runner.run()
  249. if __name__ == "__main__":
  250. import fire
  251. fire.Fire(main)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/103348
推荐阅读
相关标签
  

闽ICP备14008679号