Python 工具
1.1 GitHub Trending爬取
配置文件中,添加自己代理服务器的配置,以解决网络问题:GLOBAL_PROXY: # 改成自己的代理服务器地址
方法一:可以修改 main
函数中的 spec
参数为适当的 cron
- #main函数
- async def main(spec: str = "0 9 * * *", discord: bool = False, wxpusher: bool = True):
- ...
方法二: 基于aiocron,
- from pytz import timezone
- beijing_tz = timezone('Asia/Shanghai') 获取北京时间的时区
- cron_trigger = GithubTrendingCronTrigger("0 8 * * *", tz=beijing_tz)
- import os
- os.environ["ZHIPUAI_API_KEY"] = "此处填写你的"
- from metagpt.environment import Environment
- import asyncio
- import os
- import time
- from typing import Any, AsyncGenerator, Awaitable, Callable, Optional
- import aiohttp
- from aiocron import crontab
- from bs4 import BeautifulSoup
- from pydantic import BaseModel, Field
- from pytz import BaseTzInfo
- from metagpt.actions.action import Action
- from metagpt.config import CONFIG
- from metagpt.logs import logger
- from metagpt.roles import Role
- from metagpt.schema import Message
- WXPUSHER_TOKEN = "此处填写你的"
- WXPUSHER_UIDS = "此处填写你的"
- # 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
- class SubscriptionRunner(BaseModel):
- """A simple wrapper to manage subscription tasks for different roles using asyncio.
- Example:
- import asyncio
- from metagpt.subscription import SubscriptionRunner
- from metagpt.roles import Searcher
- from metagpt.schema import Message
- async def trigger():
- while True:
- yield Message("the latest news about OpenAI")
- await asyncio.sleep(3600 * 24)
- async def callback(msg: Message):
- print(msg.content)
- async def main():
- pb = SubscriptionRunner()
- await pb.subscribe(Searcher(), trigger(), callback)
- await pb.run()
- asyncio.run(main())
- """
- tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)
- class Config:
- arbitrary_types_allowed = True
- async def subscribe(
- self,
- role: Role,
- trigger: AsyncGenerator[Message, None],
- callback: Callable[
- [
- Message,
- ],
- Awaitable[None],
- ],
- ):
- """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
- Args:
- role: The role to subscribe.
- trigger: An asynchronous generator that yields Messages to be processed by the role.
- callback: An asynchronous function to be called with the response from the role.
- """
- loop = asyncio.get_running_loop()
- async def _start_role():
- async for msg in trigger:
- logger.info("===log===" * 3)
- logger.info(msg)
- logger.info(msg.content)
- resp = await role.run(msg.content)
- await callback(resp)
- self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")
- async def unsubscribe(self, role: Role):
- """Unsubscribes a role from its trigger and cancels the associated task.
- Args:
- role: The role to unsubscribe.
- """
- task = self.tasks.pop(role)
- task.cancel()
- async def run(self, raise_exception: bool = True):
- """Runs all subscribed tasks and handles their completion or exception.
- Args:
- raise_exception: _description_. Defaults to True.
- Raises:
- task.exception: _description_
- """
- while True:
- for role, task in self.tasks.items():
- if task.done():
- if task.exception():
- if raise_exception:
- raise task.exception()
- logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
- else:
- logger.warning(
- f"Task {task.get_name()} has completed. "
- "If this is unexpected behavior, please check the trigger function."
- )
- self.tasks.pop(role)
- break
- else:
- await asyncio.sleep(1)
- # Actions 的实现
- TRENDING_ANALYSIS_PROMPT = """# Requirements
- 您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。
- # The title about Today's GitHub Trending
- ## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
- ## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
- ## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
- ---
- # Format Example
- ```
- # [Title]
- ## Today's Trends
- Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
- The top popular projects are Project1 and Project2.
- ## The Trends Categories
- 1. Generative AI
- - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
- - [Project2](https://github/xx/project2): ...
- ...
- ## Highlights of the List
- 1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
- ...
- ```
- ---
- # Github Trending
- {trending}
- """
- class CrawlOSSTrending(Action):
- async def run(self, url: str = "https://github.com/trending"):
- async with aiohttp.ClientSession() as client:
- async with client.get(url, proxy=CONFIG.global_proxy, ssl=False) as response:
- response.raise_for_status()
- html = await response.text()
- soup = BeautifulSoup(html, 'html.parser')
- repositories = []
- for article in soup.select('article.Box-row'):
- repo_info = {}
- repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
- repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()
- # Description
- description_element = article.select_one('p')
- repo_info['description'] = description_element.text.strip() if description_element else None
- # Language
- language_element = article.select_one('span[itemprop="programmingLanguage"]')
- repo_info['language'] = language_element.text.strip() if language_element else None
- # Stars and Forks
- stars_element = article.select('a.Link--muted')[0]
- forks_element = article.select('a.Link--muted')[1]
- repo_info['stars'] = stars_element.text.strip()
- repo_info['forks'] = forks_element.text.strip()
- # Today's Stars
- today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
- repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None
- repositories.append(repo_info)
- return repositories
- class AnalysisOSSTrending(Action):
- async def run(
- self,
- trending: Any
- ):
- return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))
- # Role实现
- class OssWatcher(Role):
- name: str = "Codey",
- profile: str = "OssWatcher",
- goal: str = "Generate an insightful GitHub Trending analysis report.",
- constraints: str = "Only analyze based on the provided GitHub Trending data.",
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
- self._set_react_mode(react_mode="by_order")
- async def _act(self) -> Message:
- logger.info(f"{self._setting}: ready to {self._rc.todo}")
- # By choosing the Action by order under the hood
- # todo will be first SimpleWriteCode() then SimpleRunCode()
- todo = self._rc.todo
- msg = self.get_memories(k=1)[0] # find the most k recent messages
- result = await todo.run(msg.content)
- msg = Message(content=str(result))
- # msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
- self._rc.memory.add(msg)
- return msg
- # Trigger
- class OssInfo(BaseModel):
- url: str
- timestamp: float = Field(default_factory=time.time)
- class GithubTrendingCronTrigger():
- def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
- self.crontab = crontab(spec, tz=tz)
- self.url = url
- def __aiter__(self):
- return self
- async def __anext__(self):
- await self.crontab.next()
- logger.info(self.url)
- logger.info(OssInfo(url=self.url))
- logger.info(Message(self.url))
- return Message(self.url)
- # return Message(self.url, OssInfo(url=self.url))
- # callback
- class WxPusherClient:
- def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
- self.base_url = base_url
- self.token = token or WXPUSHER_TOKEN
- # self.token = token or os.environ["WXPUSHER_TOKEN"]
- async def send_message(
- self,
- content,
- summary: Optional[str] = None,
- content_type: int = 1,
- topic_ids: Optional[list[int]] = None,
- uids: Optional[list[int]] = None,
- verify: bool = False,
- url: Optional[str] = None,
- ):
- payload = {
- "appToken": self.token,
- "content": content,
- "summary": summary,
- "contentType": content_type,
- "topicIds": topic_ids or [],
- "uids": uids or WXPUSHER_UIDS.split(","),
- # "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
- "verifyPay": verify,
- "url": url,
- }
- url = f"{self.base_url}/api/send/message"
- return await self._request("POST", url, json=payload)
- async def _request(self, method, url, **kwargs):
- async with aiohttp.ClientSession() as session:
- async with session.request(method, url, **kwargs, ssl=False) as response:
- response.raise_for_status()
- return await response.json()
- async def wxpusher_callback(msg: Message):
- client = WxPusherClient()
- await client.send_message(msg.content, content_type=3)
- # 运行入口,
- async def main(spec: str = "* * * * *", discord: bool = False, wxpusher: bool = True):
- callbacks = []
- if wxpusher:
- callbacks.append(wxpusher_callback)
- if not callbacks:
- async def _print(msg: Message):
- print(msg.content)
- callbacks.append(_print)
- async def callback(msg):
- await asyncio.gather(*(call(msg) for call in callbacks))
- runner = SubscriptionRunner()
- await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
- await runner.run()
- if __name__ == "__main__":
- import fire
- fire.Fire(main)
