当前位置:   article > 正文

MySql --sqlalchemy---mongo示例用法_sqlalchemy连接mango

sqlalchemy连接mango
  1. from sqlalchemy.ext.declarative import declarative_base
  2. from sqlalchemy import Column, Integer, String, UniqueConstraint, Index, BigInteger, Date, Text, not_
  3. from sqlalchemy.orm import sessionmaker
  4. from sqlalchemy import create_engine
  5. import mongo_connect
  6. from mongo_connect import MongoDao
  7. from mongo_connect import url,db,coll_name
  8. import time
  9. # from utils.dataetl_config import notice_host, notice_port, notice_user, notice_pwd, notice_db
  10. host="192.168.41.128"
  11. port=3306
  12. user="root"
  13. passwd="mysql"
  14. db="collect_resultdb"
  15. engine = create_engine("mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8&autocommit=true" % (user,passwd,host,port,db),
  16. max_overflow=20,pool_size=10)
  17. # from wechat.models import SystemConst, ResearchReport
  18. Session = sessionmaker(bind=engine)
  19. session = Session()
  20. Base = declarative_base()
  21. class Notice(Base):
  22. """
  23. 该表收录了国内主流券商微信公众号研报内容
  24. """
  25. __tablename__ = 'collect_gtja'
  26. # ID = models.BigIntegerField(primary_key=True)
  27. info_id = Column(String(255), primary_key=True)
  28. datetime = Column(String(64))
  29. title = Column(String(255))
  30. file_name = Column(String(255))
  31. URL = Column(String(255))
  32. source = Column(String(255))
  33. content = Column(Text) # models.CharField(max_length=50, primary_key=True)
  34. file_path = Column(Text)
  35. file_attach = Column(Text)
  36. summary = Column(Text)
  37. stock_name = Column(String(255))
  38. stock_code = Column(String(255))
  39. news_type = Column(String(255))
  40. info_type = Column(String(255))
  41. author = Column(String(255))
  42. image_attach = Column(Text)
  43. status = Column(String(10))
  44. file_id = Column(String(64))
  45. __table_args__ = (
  46. UniqueConstraint('info_id'),
  47. Index('info_id'),
  48. )
  49. def query_by_update_time(self,offset,now,status="1"):
  50. session = Session()
  51. # dt_s = datetime.now().date()
  52. reports = session.query(Notice).filter(
  53. # Notice.datetime.between(offset,now),
  54. Notice.status == status
  55. )
  56. return session,reports
  57. def create(self):
  58. Session = sessionmaker(bind=engine)
  59. session = Session()
  60. session.add(self)
  61. session.commit()
  62. def query_by_id(self,id):
  63. session = Session()
  64. reports = session.query(Notice).filter(
  65. # Notice.datetime.between(offset,now),
  66. Notice.info_id == id
  67. )
  68. return session,reports
  69. def query_by_status(self):
  70. session = Session()
  71. # dt_s = datetime.now().date()
  72. reports = session.query(Notice).filter(
  73. # Notice.datetime.between(offset,now),
  74. Notice.status == 1
  75. )
  76. return session,reports
  77. if __name__ == '__main__':
  78. notice = Notice()
  79. mongodao = MongoDao(mongo_connect.url,mongo_connect.db,mongo_connect.coll_name)
  80. mongodao.collection
  81. se,results = notice.query_by_status()
  82. # print(results,"$"*20)
  83. # print(type(results))
  84. # print(results.values())
  85. # print(results.all())
  86. # for l in results.all():
  87. # # l = results.first()
  88. # print(l.author)
  89. for result in results.all():
  90. spider_data = result
  91. data = {}
  92. data["update_time"] = time.time()
  93. data["title"] = spider_data.title
  94. data["content"] = ""
  95. data["html"] = ""
  96. data["url"] = spider_data.URL
  97. data["source_id"] = ""
  98. data["summary"] = ""
  99. data["content_time"] = spider_data.datetime
  100. # data["info_type"] = InfoType.notice.value
  101. data["source"] = spider_data.source
  102. data["source_channel"] = ""
  103. # data["channel"] = ChannelType.spider.value
  104. data["stock_name"] = spider_data.stock_name
  105. data["stock_code"] = spider_data.stock_code
  106. # data["file_type"] = DataType.pdf.value
  107. # data["keywords"] = spider_data["summary"]
  108. data["author"] = spider_data.author
  109. data["create_time"] = ""
  110. print(data)
  111. # logger.info(data["title"])
  112. mongodao.search_upsert_one(filter={"url": data["url"]},
  113. data=data,
  114. ids={"update_time": 0,
  115. "D_id": 0})

 插入数据

  1. class KsRoomInfo(Base):
  2. __tablename__ = 'kuaishou_room_info'
  3. sys_id = Column(Integer, primary_key=True, autoincrement=True)
  4. roomId = Column(String(64)) # 游戏ID
  5. cityName = Column(String(64))
  6. nickName = Column(String(64))
  7. desc = Column(String(128)) # 比赛ID
  8. fans = Column(String(64))
  9. follow = Column(String(64))
  10. video_count = Column(String(64))
  11. game_caty = Column(String(64))
  12. user_caty = Column(String(64))
  13. sys_time = Column(DateTime, default=datetime.datetime.now)
  14. sys_timeindex = Column(Integer)
  15. def saveData(datas):
  16. session=sessionmaker(ENGINE2)()
  17. session.add(datas)
  18. session.commit()
  19. ks = KsRoomInfo(
  20. # roomId=data["data"]["sensitiveUserInfo"]["kwaiId"],
  21. roomId=room["args1"],
  22. cityName=data["data"]["sensitiveUserInfo"]["cityName"],
  23. nickName=data2["nickName"] if data2["nickName"] else room["nickName"],
  24. fans=data["data"]["sensitiveUserInfo"]["counts"]["fan"],
  25. follow=data["data"]["sensitiveUserInfo"]["counts"]["follow"],
  26. video_count=data["data"]["sensitiveUserInfo"]["counts"]["photo"],
  27. desc=data2["desc"] if data2["desc"] else room["nickName"],
  28. sys_time=sys_time,
  29. sys_timeindex=sys_timeindex,
  30. game_caty=room["game_caty"],
  31. user_caty=room["user_caty"],
  32. )
  33. saveData(ks)
  1. # -*- coding:utf8 -*-
  2. # time: 19-1-18 下午3:56
  3. # Author:zhuhao
  4. # ­*­ coding: utf­8 ­*­
  5. """
  6. """
  7. import json
  8. import os
  9. from io import BytesIO
  10. import sys
  11. import bson
  12. from pymongo import MongoClient, ReturnDocument
  13. # from utils.constants import DataStatus
  14. url = "localhost"
  15. db = "collet"
  16. coll_name = "collect_notice"
  17. class MongoDao(object):
  18. def __init__(self,url,db,coll_name):
  19. self.collection = self._get_coll(url=url, db=db, coll_name=coll_name)
  20. def _get_coll(self,url, db, coll_name):
  21. db = _get_db(url, db)
  22. return db[coll_name]
  23. def _get_connection(self,url):
  24. return MongoClient(url)
  25. def _get_db(self,url, db):
  26. client = _get_connection(url)
  27. return client[db]
  28. def search_one(self, filter={}):
  29. return self.collection.find_one(filter, no_cursor_timeout=True)
  30. def search(self, filter={}):
  31. return self.collection.find(filter, no_cursor_timeout=True)
  32. def insert_one(self,data={}):
  33. self.collection.insert_one(document=data)
  34. def update_one(self, filter={}, data={}, upsert=False):
  35. """
  36. upsert = update + insert
  37. :param filter:
  38. :param data:
  39. :param upsert:
  40. :param collection:
  41. :return:
  42. """
  43. self.collection.update_one(filter=filter, update={"$set": data}, upsert=upsert)
  44. def search_upsert_one(self, filter={}, data={}, ids={}):
  45. """
  46. :param filter:
  47. :param data:
  48. :param ids: {"key1":1,"key2":0} 合并(1)or覆盖(0)
  49. :return:
  50. """
  51. if filter is not {}:
  52. record = self.collection.find_one(filter=filter)
  53. else:
  54. record = None
  55. if record:
  56. for id_key,v in ids.items():
  57. if id_key in record:
  58. if v == 1 and isinstance(data[id_key],record[id_key]):
  59. data[id_key] += record[id_key]
  60. else:
  61. data[id_key] = record[id_key]
  62. self.collection.update_one(filter=filter, update={"$set": data})
  63. else:
  64. # 插入
  65. self.collection.insert_one(document=data)
  66. def insertFile(file_path):
  67. """
  68. insert file whose size < 16M into mongodb
  69. :param file_path:
  70. :return:
  71. """
  72. try:
  73. fin = open(file_path, "rb")
  74. img = fin.read()
  75. data = BytesIO(img)
  76. fin.close()
  77. # conn = MongoClient('10.101.12.23', 27017)
  78. conn = MongoClient('101.132.167.173', 8003)
  79. db = conn.mydb
  80. coll = db.test_set
  81. with open(file_path, 'rb') as myimage:
  82. content = BytesIO(myimage.read())
  83. coll.save(dict(
  84. content=bson.binary.Binary(content.getvalue()),
  85. filename=file_path.split(".")[0],
  86. file_format=file_path.split(".")[1]
  87. ))
  88. except Exception as e:
  89. print(e)
  90. sys.exit(1)
  91. # finally:
  92. # imgput.close() todo
  93. def file_read_binary(plugin_name, file_path=None):
  94. """
  95. 从mongo中读文件,文件为二进制
  96. :return:
  97. """
  98. try:
  99. # fin = open(os.path.join(file_path,plugin_name), "wb")
  100. # img = fin.read()
  101. # data = StringIO(img)
  102. # fin.close()
  103. conn = MongoClient('101.132.167.173', 8003)
  104. db = conn.mydb
  105. files = db.test_set
  106. res = files.find_one({"filename": plugin_name})
  107. fin = open(os.path.join(file_path, plugin_name) + "." + res["file_format"], "wb")
  108. print(res)
  109. content = res['content']
  110. fin.write(content)
  111. # print('sum:', files.count())
  112. # insertimg = imgput.put(data)
  113. # print insertimg
  114. except Exception as e:
  115. print(e)
  116. sys.exit(1)
  117. def _get_connection(url):
  118. return MongoClient(url)
  119. def _get_db(url, db):
  120. client = _get_connection(url)
  121. return client[db]
  122. def get_coll(url, db, coll_name):
  123. db = _get_db(url, db)
  124. return db[coll_name]
  125. def search(url, db, coll_name, filter={}):
  126. collection = get_coll(url=url, db=db, coll_name=coll_name)
  127. return collection.find(filter, no_cursor_timeout=True)
  128. def search_one(url, db, coll_name, filter={}, collection=None):
  129. if collection is None:
  130. collection = get_coll(url=url, db=db, coll_name=coll_name)
  131. return collection.find_one(filter, no_cursor_timeout=True)
  132. def fetch_one(url, db, coll_name, filter={}, ):
  133. collection = get_coll(url=url, db=db, coll_name=coll_name)
  134. # return collection.find_one(filter)
  135. return collection.find_one_and_update(filter,
  136. {"$set": {"status": "1"}},
  137. return_document=ReturnDocument.AFTER)
  138. def get_collection_names(url):
  139. """
  140. 获取mongodb所有数据库对应的collection名
  141. :param url:
  142. :return:
  143. """
  144. client = _get_connection(url)
  145. d = dict((db, [collection for collection in client[db].collection_names()])
  146. for db in client.database_names())
  147. return d
  148. def update_one(url, db, coll_name, filter={}, data={}, upsert=False,collection=None):
  149. if collection is None:
  150. collection = get_coll(url=url, db=db, coll_name=coll_name)
  151. else:
  152. collection.update_one(filter=filter, update={"$set": data}, upsert=upsert)
  153. # db.stu.update_one({"age": 18}, {"$set": {"age": 100}})
  154. def insert_one(url, db, coll_name, data={}):
  155. collection = get_coll(url=url, db=db, coll_name=coll_name)
  156. collection.insert_one(document=data)
  157. def insert_or_update_one(url, db, coll_name, filter={}, data={},id_key=None):
  158. collection = get_coll(url=url, db=db, coll_name=coll_name)
  159. if filter is not {}:
  160. record = collection.find_one(filter=filter)
  161. else:
  162. record = None
  163. if record:
  164. # 更新
  165. if "update_time" in record:
  166. data["update_time"] = record["update_time"]
  167. if id_key is not None:
  168. data[id_key] = record[id_key]
  169. collection.update_one(filter=filter, update={"$set": data})
  170. else:
  171. # 插入
  172. collection.insert_one(document=data)
  173. def save_or_update_one(url, db, coll_name, filter={}, data={}):
  174. collection = get_coll(url=url, db=db, coll_name=coll_name)
  175. record = collection.find_one(filter=filter)
  176. if record:
  177. # 更新
  178. data["D_id"] = record["D_id"]
  179. collection.update_one(filter=filter, update={"$set": data})
  180. else:
  181. # 插入
  182. collection.insert_one(document=data)
  183. if __name__ == "__main__":
  184. # print (search(url = 'mongodb://101.132.167.173:8013/',db="mydb",coll_name="test_clean")[0]["result"])
  185. print(get_collection_names(url='mongodb://101.132.167.173:8014/')["pyspider_resultdb1"])
  186. # result = fetch_one(url='mongodb://101.132.167.173:8014/',
  187. # db="mydb",
  188. # coll_name="test_clean",
  189. # filter={'status': {'$exists': 1}})
  190. # print(result)
  191. # if result:
  192. # print("00")
  193. # else:
  194. # print("11")

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/613184
推荐阅读
相关标签
  

闽ICP备14008679号