赞
踩
博客背景:个人metagpt智能体开发第四章学习笔记
Python 工具
第三方公众号作为消息推送的功能,如server酱、wxpusher、Pushplus等,本文选择wxpusher,并获取uid和token
爬虫相关概念,不会的可以问chatgpt
在实现OSSWatcher的Role之前,首先需要明确我们希望OSSWatcher执行哪些任务,即需要实现哪些Action。考虑到我们的目标是分析热门开源项目,因此需要先获取热门开源项目的信息。基于这一需求,我们可以将OSSWatcher拆分为两个Action:一是爬取热门开源项目,二是分析热门开源项目。
1.1 GitHub Trending爬取
metagpt/actions/oss_trending.py
文件,并创建metagpt/roles/oss_watcher.py
文件编写Role代码。- # Role实现
- class OssWatcher(Role):
- name: str = "Codey",
- profile: str = "OssWatcher",
- goal: str = "Generate an insightful GitHub Trending analysis report.",
- constraints: str = "Only analyze based on the provided GitHub Trending data.",
-
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
- self._set_react_mode(react_mode="by_order")
-
- async def _act(self) -> Message:
- logger.info(f"{self._setting}: ready to {self._rc.todo}")
- # By choosing the Action by order under the hood
- # todo will be first SimpleWriteCode() then SimpleRunCode()
- todo = self._rc.todo
-
- msg = self.get_memories(k=1)[0] # find the most k recent messages
- result = await todo.run(msg.content)
-
- msg = Message(content=str(result))
- # msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
- self._rc.memory.add(msg)
- return msg
实现代码:
- # Trigger
- class OssInfo(BaseModel):
- url: str
- timestamp: float = Field(default_factory=time.time)
-
-
- class GithubTrendingCronTrigger():
- def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
- self.crontab = crontab(spec, tz=tz)
- self.url = url
-
- def __aiter__(self):
- return self
-
- async def __anext__(self):
- await self.crontab.next()
- logger.info(self.url)
- logger.info(OssInfo(url=self.url))
- logger.info(Message(self.url))
- return Message(self.url)
- # return Message(self.url, OssInfo(url=self.url))
其中我们使用的是wxpusher,需要WXPUSHER_TOKEN和WXPUSHER_UIDS
WXPUSHER_TOKEN即wxpush的APP_TOKEN,参考官方文档获取appToken
WXPUSHER_UIDS可以从应用管理页的”用户管理->用户列表“获取用户的UID,如果要发送给多个用户,可以用逗号将不同用户UID隔开
- # callback
-
- class WxPusherClient:
- def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
- self.base_url = base_url
- self.token = token or WXPUSHER_TOKEN
- # self.token = token or os.environ["WXPUSHER_TOKEN"]
-
- async def send_message(
- self,
- content,
- summary: Optional[str] = None,
- content_type: int = 1,
- topic_ids: Optional[list[int]] = None,
- uids: Optional[list[int]] = None,
- verify: bool = False,
- url: Optional[str] = None,
- ):
- payload = {
- "appToken": self.token,
- "content": content,
- "summary": summary,
- "contentType": content_type,
- "topicIds": topic_ids or [],
- "uids": uids or WXPUSHER_UIDS.split(","),
- # "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
- "verifyPay": verify,
- "url": url,
- }
- url = f"{self.base_url}/api/send/message"
- return await self._request("POST", url, json=payload)
-
- async def _request(self, method, url, **kwargs):
- async with aiohttp.ClientSession() as session:
- async with session.request(method, url, **kwargs, ssl=False) as response:
- response.raise_for_status()
- return await response.json()
-
-
- async def wxpusher_callback(msg: Message):
- client = WxPusherClient()
- await client.send_message(msg.content, content_type=3)
config/key.yaml
配置文件中,添加自己代理服务器的配置,以解决网络问题:GLOBAL_PROXY: http://127.0.0.1:8118 # 改成自己的代理服务器地址
通过「控制面板」——》「网络和Internet」——》「Internet选项」——》「连接」——》「局域网设置」,可以看到代理的地址和端口号;切记端口一定要复制成自己的!否则会显示计算机拒绝连接,不要粗心哦
编程语言趋势:观察Trending列表中使用的编程语言,了解当前哪些编程语言在开发者社区中更受欢迎
项目类型和用途:分析Trending列表中的项目,看看它们是属于哪些类别,以及它们的具体用途是什么
社区活跃度:查看项目的星标数量、贡献者数量
新兴技术和工具:注意新项目和涌现的技术,以便了解当前的技术趋势
- TRENDING_ANALYSIS_PROMPT = """# Requirements
- 您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。
- # The title about Today's GitHub Trending
- ## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
- ## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
- ## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
- ---
- # Format Example
- ```
- # [Title]
- ## Today's Trends
- Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
- The top popular projects are Project1 and Project2.
- ## The Trends Categories
- 1. Generative AI
- - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
- - [Project2](https://github/xx/project2): ...
- ...
- ## Highlights of the List
- 1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
- ...
- ```
- ---
- # Github Trending
- {trending}
- """
方法一:可以修改 main
函数中的 spec
参数为适当的 cron
表达式,以便在每天的特定时间触发任务。
- #main函数
- async def main(spec: str = "0 9 * * *", discord: bool = False, wxpusher: bool = True):
- ...
方法二: 基于aiocron,
可以用cron语法非常灵活地配置定时规则
-
- from pytz import timezone
- beijing_tz = timezone('Asia/Shanghai') 获取北京时间的时区
- cron_trigger = GithubTrendingCronTrigger("0 8 * * *", tz=beijing_tz)
- import os
-
- os.environ["ZHIPUAI_API_KEY"] = "此处填写你的"
-
- from metagpt.environment import Environment
-
- import asyncio
- import os
- import time
- from typing import Any, AsyncGenerator, Awaitable, Callable, Optional
-
- import aiohttp
- from aiocron import crontab
- from bs4 import BeautifulSoup
- from pydantic import BaseModel, Field
- from pytz import BaseTzInfo
-
- from metagpt.actions.action import Action
- from metagpt.config import CONFIG
- from metagpt.logs import logger
- from metagpt.roles import Role
- from metagpt.schema import Message
-
- WXPUSHER_TOKEN = "此处填写你的"
- WXPUSHER_UIDS = "此处填写你的"
-
-
- # 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
- class SubscriptionRunner(BaseModel):
- """A simple wrapper to manage subscription tasks for different roles using asyncio.
- Example:
- import asyncio
- from metagpt.subscription import SubscriptionRunner
- from metagpt.roles import Searcher
- from metagpt.schema import Message
- async def trigger():
- while True:
- yield Message("the latest news about OpenAI")
- await asyncio.sleep(3600 * 24)
- async def callback(msg: Message):
- print(msg.content)
- async def main():
- pb = SubscriptionRunner()
- await pb.subscribe(Searcher(), trigger(), callback)
- await pb.run()
- asyncio.run(main())
- """
-
- tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)
-
- class Config:
- arbitrary_types_allowed = True
-
- async def subscribe(
- self,
- role: Role,
- trigger: AsyncGenerator[Message, None],
- callback: Callable[
- [
- Message,
- ],
- Awaitable[None],
- ],
- ):
- """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
- Args:
- role: The role to subscribe.
- trigger: An asynchronous generator that yields Messages to be processed by the role.
- callback: An asynchronous function to be called with the response from the role.
- """
- loop = asyncio.get_running_loop()
-
- async def _start_role():
- async for msg in trigger:
- logger.info("===log===" * 3)
- logger.info(msg)
- logger.info(msg.content)
- resp = await role.run(msg.content)
- await callback(resp)
-
- self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")
-
- async def unsubscribe(self, role: Role):
- """Unsubscribes a role from its trigger and cancels the associated task.
- Args:
- role: The role to unsubscribe.
- """
- task = self.tasks.pop(role)
- task.cancel()
-
- async def run(self, raise_exception: bool = True):
- """Runs all subscribed tasks and handles their completion or exception.
- Args:
- raise_exception: _description_. Defaults to True.
- Raises:
- task.exception: _description_
- """
- while True:
- for role, task in self.tasks.items():
- if task.done():
- if task.exception():
- if raise_exception:
- raise task.exception()
- logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
- else:
- logger.warning(
- f"Task {task.get_name()} has completed. "
- "If this is unexpected behavior, please check the trigger function."
- )
- self.tasks.pop(role)
- break
- else:
- await asyncio.sleep(1)
-
-
- # Actions 的实现
- TRENDING_ANALYSIS_PROMPT = """# Requirements
- 您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。
- # The title about Today's GitHub Trending
- ## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
- ## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
- ## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
- ---
- # Format Example
- ```
- # [Title]
- ## Today's Trends
- Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
- The top popular projects are Project1 and Project2.
- ## The Trends Categories
- 1. Generative AI
- - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
- - [Project2](https://github/xx/project2): ...
- ...
- ## Highlights of the List
- 1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
- ...
- ```
- ---
- # Github Trending
- {trending}
- """
-
-
-
- class CrawlOSSTrending(Action):
- async def run(self, url: str = "https://github.com/trending"):
- async with aiohttp.ClientSession() as client:
- async with client.get(url, proxy=CONFIG.global_proxy, ssl=False) as response:
- response.raise_for_status()
- html = await response.text()
-
- soup = BeautifulSoup(html, 'html.parser')
-
- repositories = []
-
- for article in soup.select('article.Box-row'):
- repo_info = {}
-
- repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
- repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()
-
- # Description
- description_element = article.select_one('p')
- repo_info['description'] = description_element.text.strip() if description_element else None
-
- # Language
- language_element = article.select_one('span[itemprop="programmingLanguage"]')
- repo_info['language'] = language_element.text.strip() if language_element else None
-
- # Stars and Forks
- stars_element = article.select('a.Link--muted')[0]
- forks_element = article.select('a.Link--muted')[1]
- repo_info['stars'] = stars_element.text.strip()
- repo_info['forks'] = forks_element.text.strip()
-
- # Today's Stars
- today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
- repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None
-
- repositories.append(repo_info)
-
- return repositories
-
-
- class AnalysisOSSTrending(Action):
-
- async def run(
- self,
- trending: Any
- ):
- return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))
-
-
- # Role实现
- class OssWatcher(Role):
- name: str = "Codey",
- profile: str = "OssWatcher",
- goal: str = "Generate an insightful GitHub Trending analysis report.",
- constraints: str = "Only analyze based on the provided GitHub Trending data.",
-
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
- self._set_react_mode(react_mode="by_order")
-
- async def _act(self) -> Message:
- logger.info(f"{self._setting}: ready to {self._rc.todo}")
- # By choosing the Action by order under the hood
- # todo will be first SimpleWriteCode() then SimpleRunCode()
- todo = self._rc.todo
-
- msg = self.get_memories(k=1)[0] # find the most k recent messages
- result = await todo.run(msg.content)
-
- msg = Message(content=str(result))
- # msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
- self._rc.memory.add(msg)
- return msg
-
-
- # Trigger
- class OssInfo(BaseModel):
- url: str
- timestamp: float = Field(default_factory=time.time)
-
-
- class GithubTrendingCronTrigger():
- def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
- self.crontab = crontab(spec, tz=tz)
- self.url = url
-
- def __aiter__(self):
- return self
-
- async def __anext__(self):
- await self.crontab.next()
- logger.info(self.url)
- logger.info(OssInfo(url=self.url))
- logger.info(Message(self.url))
- return Message(self.url)
- # return Message(self.url, OssInfo(url=self.url))
-
-
- # callback
-
- class WxPusherClient:
- def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
- self.base_url = base_url
- self.token = token or WXPUSHER_TOKEN
- # self.token = token or os.environ["WXPUSHER_TOKEN"]
-
- async def send_message(
- self,
- content,
- summary: Optional[str] = None,
- content_type: int = 1,
- topic_ids: Optional[list[int]] = None,
- uids: Optional[list[int]] = None,
- verify: bool = False,
- url: Optional[str] = None,
- ):
- payload = {
- "appToken": self.token,
- "content": content,
- "summary": summary,
- "contentType": content_type,
- "topicIds": topic_ids or [],
- "uids": uids or WXPUSHER_UIDS.split(","),
- # "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
- "verifyPay": verify,
- "url": url,
- }
- url = f"{self.base_url}/api/send/message"
- return await self._request("POST", url, json=payload)
-
- async def _request(self, method, url, **kwargs):
- async with aiohttp.ClientSession() as session:
- async with session.request(method, url, **kwargs, ssl=False) as response:
- response.raise_for_status()
- return await response.json()
-
-
- async def wxpusher_callback(msg: Message):
- client = WxPusherClient()
- await client.send_message(msg.content, content_type=3)
-
-
- # 运行入口,
- async def main(spec: str = "* * * * *", discord: bool = False, wxpusher: bool = True):
- callbacks = []
-
- if wxpusher:
- callbacks.append(wxpusher_callback)
-
- if not callbacks:
- async def _print(msg: Message):
- print(msg.content)
-
- callbacks.append(_print)
-
- async def callback(msg):
- await asyncio.gather(*(call(msg) for call in callbacks))
-
- runner = SubscriptionRunner()
- await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
- await runner.run()
-
-
- if __name__ == "__main__":
- import fire
-
- fire.Fire(main)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。