赞
踩
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, UniqueConstraint, Index, BigInteger, Date, Text, not_
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy import create_engine
- import mongo_connect
- from mongo_connect import MongoDao
- from mongo_connect import url,db,coll_name
- import time
-
- # from utils.dataetl_config import notice_host, notice_port, notice_user, notice_pwd, notice_db
-
- host="192.168.41.128"
- port=3306
- user="root"
- passwd="mysql"
- db="collect_resultdb"
-
- engine = create_engine("mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8&autocommit=true" % (user,passwd,host,port,db),
- max_overflow=20,pool_size=10)
-
- # from wechat.models import SystemConst, ResearchReport
- Session = sessionmaker(bind=engine)
- session = Session()
-
- Base = declarative_base()
-
- class Notice(Base):
- """
- 该表收录了国内主流券商微信公众号研报内容
- """
- __tablename__ = 'collect_gtja'
- # ID = models.BigIntegerField(primary_key=True)
- info_id = Column(String(255), primary_key=True)
- datetime = Column(String(64))
- title = Column(String(255))
- file_name = Column(String(255))
- URL = Column(String(255))
- source = Column(String(255))
-
- content = Column(Text) # models.CharField(max_length=50, primary_key=True)
- file_path = Column(Text)
- file_attach = Column(Text)
- summary = Column(Text)
-
- stock_name = Column(String(255))
- stock_code = Column(String(255))
- news_type = Column(String(255))
- info_type = Column(String(255))
- author = Column(String(255))
-
- image_attach = Column(Text)
- status = Column(String(10))
-
- file_id = Column(String(64))
-
- __table_args__ = (
- UniqueConstraint('info_id'),
- Index('info_id'),
- )
-
- def query_by_update_time(self,offset,now,status="1"):
- session = Session()
- # dt_s = datetime.now().date()
- reports = session.query(Notice).filter(
- # Notice.datetime.between(offset,now),
- Notice.status == status
- )
- return session,reports
-
- def create(self):
- Session = sessionmaker(bind=engine)
- session = Session()
- session.add(self)
- session.commit()
-
- def query_by_id(self,id):
- session = Session()
- reports = session.query(Notice).filter(
- # Notice.datetime.between(offset,now),
- Notice.info_id == id
- )
- return session,reports
-
- def query_by_status(self):
- session = Session()
- # dt_s = datetime.now().date()
- reports = session.query(Notice).filter(
- # Notice.datetime.between(offset,now),
- Notice.status == 1
- )
- return session,reports
-
- if __name__ == '__main__':
-
- notice = Notice()
- mongodao = MongoDao(mongo_connect.url,mongo_connect.db,mongo_connect.coll_name)
- mongodao.collection
- se,results = notice.query_by_status()
- # print(results,"$"*20)
- # print(type(results))
- # print(results.values())
- # print(results.all())
- # for l in results.all():
- # # l = results.first()
- # print(l.author)
- for result in results.all():
- spider_data = result
- data = {}
- data["update_time"] = time.time()
- data["title"] = spider_data.title
- data["content"] = ""
- data["html"] = ""
-
- data["url"] = spider_data.URL
-
- data["source_id"] = ""
- data["summary"] = ""
-
- data["content_time"] = spider_data.datetime
-
- # data["info_type"] = InfoType.notice.value
-
- data["source"] = spider_data.source
- data["source_channel"] = ""
-
- # data["channel"] = ChannelType.spider.value
- data["stock_name"] = spider_data.stock_name
- data["stock_code"] = spider_data.stock_code
-
- # data["file_type"] = DataType.pdf.value
- # data["keywords"] = spider_data["summary"]
- data["author"] = spider_data.author
- data["create_time"] = ""
-
- print(data)
- # logger.info(data["title"])
-
- mongodao.search_upsert_one(filter={"url": data["url"]},
- data=data,
- ids={"update_time": 0,
- "D_id": 0})
-
插入数据
- class KsRoomInfo(Base):
- __tablename__ = 'kuaishou_room_info'
- sys_id = Column(Integer, primary_key=True, autoincrement=True)
- roomId = Column(String(64)) # 游戏ID
- cityName = Column(String(64))
- nickName = Column(String(64))
- desc = Column(String(128)) # 比赛ID
- fans = Column(String(64))
- follow = Column(String(64))
- video_count = Column(String(64))
- game_caty = Column(String(64))
- user_caty = Column(String(64))
- sys_time = Column(DateTime, default=datetime.datetime.now)
- sys_timeindex = Column(Integer)
-
-
-
- def saveData(datas):
- session=sessionmaker(ENGINE2)()
- session.add(datas)
- session.commit()
-
-
- ks = KsRoomInfo(
- # roomId=data["data"]["sensitiveUserInfo"]["kwaiId"],
- roomId=room["args1"],
- cityName=data["data"]["sensitiveUserInfo"]["cityName"],
- nickName=data2["nickName"] if data2["nickName"] else room["nickName"],
- fans=data["data"]["sensitiveUserInfo"]["counts"]["fan"],
- follow=data["data"]["sensitiveUserInfo"]["counts"]["follow"],
- video_count=data["data"]["sensitiveUserInfo"]["counts"]["photo"],
- desc=data2["desc"] if data2["desc"] else room["nickName"],
- sys_time=sys_time,
- sys_timeindex=sys_timeindex,
- game_caty=room["game_caty"],
- user_caty=room["user_caty"],
- )
-
- saveData(ks)
- # -*- coding:utf8 -*-
- # time: 19-1-18 下午3:56
- # Author:zhuhao
- # * coding: utf8 *
- """
- """
- import json
- import os
- from io import BytesIO
- import sys
-
- import bson
- from pymongo import MongoClient, ReturnDocument
-
-
- # from utils.constants import DataStatus
- url = "localhost"
- db = "collet"
- coll_name = "collect_notice"
- class MongoDao(object):
- def __init__(self,url,db,coll_name):
- self.collection = self._get_coll(url=url, db=db, coll_name=coll_name)
-
- def _get_coll(self,url, db, coll_name):
- db = _get_db(url, db)
- return db[coll_name]
-
- def _get_connection(self,url):
- return MongoClient(url)
-
- def _get_db(self,url, db):
- client = _get_connection(url)
- return client[db]
-
- def search_one(self, filter={}):
- return self.collection.find_one(filter, no_cursor_timeout=True)
-
- def search(self, filter={}):
- return self.collection.find(filter, no_cursor_timeout=True)
-
- def insert_one(self,data={}):
- self.collection.insert_one(document=data)
-
- def update_one(self, filter={}, data={}, upsert=False):
- """
- upsert = update + insert
- :param filter:
- :param data:
- :param upsert:
- :param collection:
- :return:
- """
- self.collection.update_one(filter=filter, update={"$set": data}, upsert=upsert)
-
- def search_upsert_one(self, filter={}, data={}, ids={}):
- """
- :param filter:
- :param data:
- :param ids: {"key1":1,"key2":0} 合并(1)or覆盖(0)
- :return:
- """
- if filter is not {}:
- record = self.collection.find_one(filter=filter)
- else:
- record = None
- if record:
- for id_key,v in ids.items():
- if id_key in record:
- if v == 1 and isinstance(data[id_key],record[id_key]):
- data[id_key] += record[id_key]
- else:
- data[id_key] = record[id_key]
- self.collection.update_one(filter=filter, update={"$set": data})
- else:
- # 插入
- self.collection.insert_one(document=data)
-
-
-
-
-
-
- def insertFile(file_path):
- """
- insert file whose size < 16M into mongodb
- :param file_path:
- :return:
- """
- try:
- fin = open(file_path, "rb")
- img = fin.read()
- data = BytesIO(img)
- fin.close()
- # conn = MongoClient('10.101.12.23', 27017)
- conn = MongoClient('101.132.167.173', 8003)
- db = conn.mydb
- coll = db.test_set
- with open(file_path, 'rb') as myimage:
- content = BytesIO(myimage.read())
- coll.save(dict(
- content=bson.binary.Binary(content.getvalue()),
- filename=file_path.split(".")[0],
- file_format=file_path.split(".")[1]
- ))
- except Exception as e:
- print(e)
- sys.exit(1)
- # finally:
- # imgput.close() todo
-
-
- def file_read_binary(plugin_name, file_path=None):
- """
- 从mongo中读文件,文件为二进制
- :return:
- """
- try:
- # fin = open(os.path.join(file_path,plugin_name), "wb")
- # img = fin.read()
- # data = StringIO(img)
- # fin.close()
- conn = MongoClient('101.132.167.173', 8003)
- db = conn.mydb
- files = db.test_set
- res = files.find_one({"filename": plugin_name})
- fin = open(os.path.join(file_path, plugin_name) + "." + res["file_format"], "wb")
- print(res)
- content = res['content']
-
- fin.write(content)
- # print('sum:', files.count())
- # insertimg = imgput.put(data)
- # print insertimg
- except Exception as e:
- print(e)
- sys.exit(1)
-
-
- def _get_connection(url):
- return MongoClient(url)
-
-
- def _get_db(url, db):
- client = _get_connection(url)
- return client[db]
-
-
- def get_coll(url, db, coll_name):
- db = _get_db(url, db)
- return db[coll_name]
-
-
- def search(url, db, coll_name, filter={}):
- collection = get_coll(url=url, db=db, coll_name=coll_name)
- return collection.find(filter, no_cursor_timeout=True)
-
-
- def search_one(url, db, coll_name, filter={}, collection=None):
- if collection is None:
- collection = get_coll(url=url, db=db, coll_name=coll_name)
- return collection.find_one(filter, no_cursor_timeout=True)
-
-
- def fetch_one(url, db, coll_name, filter={}, ):
- collection = get_coll(url=url, db=db, coll_name=coll_name)
- # return collection.find_one(filter)
- return collection.find_one_and_update(filter,
- {"$set": {"status": "1"}},
- return_document=ReturnDocument.AFTER)
-
-
- def get_collection_names(url):
- """
- 获取mongodb所有数据库对应的collection名
- :param url:
- :return:
- """
- client = _get_connection(url)
- d = dict((db, [collection for collection in client[db].collection_names()])
- for db in client.database_names())
- return d
-
-
- def update_one(url, db, coll_name, filter={}, data={}, upsert=False,collection=None):
- if collection is None:
- collection = get_coll(url=url, db=db, coll_name=coll_name)
- else:
- collection.update_one(filter=filter, update={"$set": data}, upsert=upsert)
- # db.stu.update_one({"age": 18}, {"$set": {"age": 100}})
-
-
- def insert_one(url, db, coll_name, data={}):
- collection = get_coll(url=url, db=db, coll_name=coll_name)
- collection.insert_one(document=data)
-
-
- def insert_or_update_one(url, db, coll_name, filter={}, data={},id_key=None):
- collection = get_coll(url=url, db=db, coll_name=coll_name)
- if filter is not {}:
- record = collection.find_one(filter=filter)
- else:
- record = None
- if record:
- # 更新
- if "update_time" in record:
- data["update_time"] = record["update_time"]
- if id_key is not None:
- data[id_key] = record[id_key]
- collection.update_one(filter=filter, update={"$set": data})
- else:
- # 插入
- collection.insert_one(document=data)
-
-
- def save_or_update_one(url, db, coll_name, filter={}, data={}):
- collection = get_coll(url=url, db=db, coll_name=coll_name)
- record = collection.find_one(filter=filter)
- if record:
- # 更新
- data["D_id"] = record["D_id"]
- collection.update_one(filter=filter, update={"$set": data})
- else:
- # 插入
- collection.insert_one(document=data)
-
-
- if __name__ == "__main__":
- # print (search(url = 'mongodb://101.132.167.173:8013/',db="mydb",coll_name="test_clean")[0]["result"])
- print(get_collection_names(url='mongodb://101.132.167.173:8014/')["pyspider_resultdb1"])
- # result = fetch_one(url='mongodb://101.132.167.173:8014/',
- # db="mydb",
- # coll_name="test_clean",
- # filter={'status': {'$exists': 1}})
- # print(result)
- # if result:
- # print("00")
- # else:
- # print("11")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。