当前位置:   article > 正文

Python连接 Mysql、MongoDB、Redis、Doris、kafka方法汇总_pyspark 怎么连接 doris

pyspark 怎么连接 doris

Python连接 Mysql、MongoDB、Redis、Doris方法汇总

yaml

在这里插入图片描述

with open('./config.yaml') as f:
    load = yaml.safe_load(f)
    redis_host = load['redis']['host']
    redis_port = load['redis']['port']
    redis_pw = load['redis']['password']
    env = load['env']
    __mongo_url__ = load['mongo']['url']
    amap_url = load['amap']['url']

__pool__ = redis.ConnectionPool(host=redis_host, port=redis_port, password=redis_pw)
# 连接 mongodb
mongo_client = pymongo.MongoClient(__mongo_url__)
# 连接 redis
redis_conn = redis.Redis(connection_pool=__pool__)
# 连接 kafka
servers_ = load['kafka']['bootstrap_servers']
kafka_producer = KafkaProducer(bootstrap_servers=servers_, compression_type='lz4', batch_size=0,)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

sqlalchemy

在这里插入图片描述

# 连接 redis
PROXY_REDIS_CONN = StrictRedis().from_url(REDIS_URI.format(252))
# 连接 mysql
etl_read_cache_engine = create_engine(
    CACHE_ORIGIN_DATA_CONF['url'],
    max_overflow=CACHE_ORIGIN_DATA_CONF['max_overflow'],  # 超过连接池大小外最多创建的连接
    pool_size=CACHE_ORIGIN_DATA_CONF['pool_size'],  # 连接池大小
    pool_timeout=CACHE_ORIGIN_DATA_CONF['pool_timeout'],  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=CACHE_ORIGIN_DATA_CONF['pool_recycle']  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
ReadCacheSession = sessionmaker(bind=etl_read_cache_engine)
etl_read_cache_session = scoped_session(ReadCacheSession)


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号