赞
踩
//创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //基本的数据源 //1、端口数据源 env.socketTextStream(“master”,6666) //连接虚拟机等端口,master是虚拟机主机名,也可以是ip //2、文件数据源 env.readTextFile(path,"UTF_8") //path是要读取文件的路径,utf-8是字符编码 //3、本地数据源 env.fromSequeence(1,100) //是一个Seq,从1到100 内容 env.fromCollection(List(1,2,3,4)) //是一个集合列表,内容格式要一致 env.fromElements(1,2,3,"d") //单个元素,,内容格式可以不一样
kakfa producer生产者产生数据,flink读取
//创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//定义kakfa消费者配置文件
val props = new Properties()
//可以传入多个生产者,,master是主机名,也可以换成ip
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092",)
//分组名称
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gr01")
//读取kafka生产者产生的数据
//addSource需要的参数是一个SourceFunction,这里传入的是FlinkKafkaConsumer,也可以自定义,继承RichParallelSourceFunction即可
//FlinkKafkaConsumer:参数1,主题,参数2,序列化,参数3,配置文件
val inputStream = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props))
object MyJDBCSource { def main(args: Array[String]): Unit = { //创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //调用addSource,传入自己写的自定义的Source env.addSource(new MyJDBCSourceFunc) .print() //启动 env.execute() } } //自定义Source数据源 //在同一个文件下创建一个类,然后继承RichParallelSourceFunction,是多线程的 //参数Worker是一个自己写的样例类,是根据数据中的数据来编写的 class MyJDBCSourceFunc extends RichParallelSourceFunction[Worker] { var conn:Connection = _ var statement:PreparedDtatement = _ var flag:Boolean = true //连接数据库 override def open(oarameters:Configuartion):Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhos:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","147258") statemant = conn.prepareStatement("select * from stu") } //对从数据库中读取的数据做输出 override def run(ctx:SourceFunction.SourceContext[Worker]):Unit={ //用flag控制循环 while(flag){ Thread.sleep(5000) val resultSet = statement.executeQuery() while(restultSet.next()){ val id = resultSet.getInt(1) val name = resultSet.getString(2) ctx.collect(Worker(id,name)) } } } //控制循环override def cancel():Unit = flag =flase //关闭连接 override def close():Unit = { if(statemant!=null) statemant.close() if(conn!=null) conn.close() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。