赞
踩
public class FinkTest { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//摄入时间 //env.enableCheckpointing(5000); //创建kafka-topic Properties p = LoadResourcesUtils.getProperties("kafka.properties"); String inputTopic = p.getProperty("source.inputTopic"); String outputTopic = p.getProperty("source.outputTopic"); //kafka addSource DataStream<String> kafkaStream = env.addSource(KafkaStreamBuilder.kafkaConsumer(inputTopic)); //kafka addSink kafkaStream.addSink(KafkaSink.KafkaProducer(driversTopicPattern)); //mysql addSink kafkaStream.addSink(new OrderMySqlSink()); //hbase addSink kafkaStream..addSink(new HbaseSink(configs.topicOut)); //自定义 addSource DataStream<String> myStream = env.addSource(new MySource()); //mysql addSource DataStream<String> driverStream = env.addSource(new MySqlSource()); env.execute("my flink job"); } }
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * @author liquan * * 构建KafkaStream * */ public class KafkaStreamBuilder { public static FlinkKafkaConsumer<String> kafkaConsumer(String topics) { Properties p = LoadResourcesUtils.getProperties("application.properties"); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers")); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id")); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset")); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit")); properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor"); // String topics = consumerConfig.getTopics(); List<String> topicsSet = new ArrayList<String>(Arrays.asList(topics.split(","))); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topicsSet, new SimpleStringSchema(), properties);//test0是kafka中开启的topic // myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); return myConsumer; } }
import com.shengekeji.simulator.serialization.OutSerializationSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; public class KafkaSink { public static FlinkKafkaProducer<String> KafkaProducer(String topics) { Properties p = LoadResourcesUtils.getProperties("application.properties"); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers")); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id")); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset")); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit")); properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor"); return new FlinkKafkaProducer<>(topics, new OutSerializationSchema(), properties); } }
import com.alibaba.fastjson.JSONObject; import com.shengekeji.simulator.dao.OrderDao; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.ibatis.session.SqlSession; import com.shengekeji.simulator.model.OrderModel; public class OrderMySqlSink extends RichSinkFunction<String> { @Override public void invoke(String value, Context context) throws Exception { SqlSession sqlSession = null; try { OrderModel order= JSONObject.parseObject(value, OrderModel.class); sqlSession = MyBatisUtil.openSqlSession(); // 通过SqlSession对象得到Mapper接口的一个代理对象 // 需要传递的参数是Mapper接口的类型 OrderDao dao = sqlSession.getMapper(OrderDao.class); System.err.println(order); dao.insert(order); sqlSession.commit(); }catch (Exception e){ e.printStackTrace(); System.err.println(e.getMessage()); sqlSession.rollback(); }finally { if (sqlSession != null){ sqlSession.close(); } } } }
注,数据入库时用的mybatis方式,MyBatisUtil,OrderDao,OrderModel根据自己环境自己定义
import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Properties; public class MySource implements SourceFunction<String> { private static final long serialVersionUID = 1L; private volatile boolean isRunning = true; @Override public void run(SourceContext<String> sourceContext) throws Exception { while(this.isRunning) { Thread.sleep(6000); String order = getDriverData(); sourceContext.collect(order); } } @Override public void cancel() { this.isRunning = false; } //随机产生订单数据 public String getDriverData() { Properties p = LoadResourcesUtils.getProperties("content.properties"); String driverJson = p.getProperty("source.driverJson"); String value = driverJson ; if(value.indexOf("%orderId") >= 0){ value = value.replaceAll("%orderId",RandomUtil.getOrderId()); } if(value.indexOf("%appId") >= 0){ value = value.replaceAll("%appId",RandomUtil.getAppId()); } if(value.indexOf("%serviceId") >= 0){ value = value.replaceAll("%serviceId",RandomUtil.getServiceId()); } if(value.indexOf("%passageId") >= 0){ value = value.replaceAll("%passageId",RandomUtil.getPassageId()); } if(value.indexOf("%driverId") >= 0){ value = value.replaceAll("%driverId",RandomUtil.getDriverId()); } if(value.indexOf("%startLoclatitude") >= 0){ LngLat startLoc=RandomUtil.getCoordinate(); value = value.replaceAll("%startLoclatitude",Double.toString(startLoc.latitude)); value = value.replaceAll("%startLoclongitude",Double.toString(startLoc.longitude)); } if(value.indexOf("%endLoclatitude") >= 0){ LngLat endLoc=RandomUtil.getCoordinate(); value = value.replaceAll("%endLoclatitude",Double.toString(endLoc.latitude)); value = value.replaceAll("%endLoclongitude",Double.toString(endLoc.longitude)); } if(value.indexOf("%loclatitude") >= 0){ LngLat loc=RandomUtil.getCoordinate(); value = value.replaceAll("%loclatitude",Double.toString(loc.latitude)); value = value.replaceAll("%loclongitude",Double.toString(loc.longitude)); } if(value.indexOf("%flag") >= 0){ value = value.replaceAll("%flag",Integer.toString(RandomUtil.getFlag())); } if(value.indexOf("%pushFlag") >= 0){ value = value.replaceAll("%pushFlag",Integer.toString(RandomUtil.getPushFlag())); } if(value.indexOf("%state") >= 0){ value = value.replaceAll("%state",Integer.toString(RandomUtil.getState())); } if(value.indexOf("%d") >= 0){ value = value.replaceAll("%d", RandomUtil.getNum().toString()); } if(value.indexOf("%s") >= 0){ value = value.replaceAll("%s", RandomUtil.getStr()); } if(value.indexOf("%f") >= 0){ value = value.replaceAll("%f",RandomUtil.getDoubleStr()); } if(value.indexOf("%ts") >= 0){ value = value.replaceAll("%ts",RandomUtil.getTimeStr()); } if(value.indexOf("%tl") >= 0){ value = value.replaceAll("%tl",RandomUtil.getTimeLongStr()); } System.out.println(value); return value; } }
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.shengekeji.simulator.dao.BestDispatchDao; import com.shengekeji.simulator.dao.DriverDao; import com.shengekeji.simulator.model.DispatchModel; import com.shengekeji.simulator.model.DriverModel; import com.shengekeji.simulator.model.GeographyOrder; import com.shengekeji.simulator.model.PassagesModel; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.ibatis.session.SqlSession; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class MySqlDriverSource implements SourceFunction<String> { private static final long serialVersionUID = 1L; private volatile boolean isRunning = true; /** * 此处是代码的关键,要从mysql表中,把数据读取出来 * @param sourceContext * @throws Exception */ @Override public void run(SourceContext<String> sourceContext) throws Exception { while(this.isRunning) { Thread.sleep(5000); System.out.println("--------------------------"); SqlSession sqlSession = null; Map<String,Object> map = new HashMap<String, Object>(); map.put("appId","SGKJ"); try { sqlSession = MyBatisUtil.openSqlSession(); // 通过SqlSession对象得到Mapper接口的一个代理对象 // 需要传递的参数是Mapper接口的类型 //司机信息数据 DriverDao driverdao = sqlSession.getMapper(DriverDao.class); List<DriverModel> drivers = driverdao.selectAllActiveDriver(map); //处理每个司机 for (DriverModel driver:drivers){ driver.setLoc(new LngLat(locLongitude,locLatitude)); driver.setSendTime(RandomUtil.getTimeStr()); String dr = JSONObject.toJSONString(driver, SerializerFeature.DisableCircularReferenceDetect); System.out.println(dr); sourceContext.collect(dr); } }catch (Exception e){ e.printStackTrace(); System.err.println(e.getMessage()); sqlSession.rollback(); }finally { if (sqlSession != null){ sqlSession.close(); } } } } @Override public void cancel() { this.isRunning = false; } }
注,数据读取用的mybatis方式,MyBatisUtil,DriverDao,DriverModel根据自己环境自己定义
import com.shengekeji.owl.constant.Constants; import com.shengekeji.owl.pojo.Message; import com.shengekeji.owl.util.HBaseUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.util.ArrayList; import java.util.List; public class HbaseSink extends RichSinkFunction<Message> { private Integer maxSize = 1000; private Long delayTime = 5000L; private String tableName; public HbaseSink(String tableName) { this.tableName = tableName; } public HbaseSink(Integer maxSize, Long delayTime) { this.maxSize = maxSize; this.delayTime = delayTime; } private Connection connection; private Long lastInvokeTime; private List<Put> puts = new ArrayList<Put>(); // 创建连接 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = HBaseUtil.getConnection(Constants.ZOOKEEPER_QUORUM,Constants.ZOOKEEPER_PORT); // 获取系统当前时间 lastInvokeTime = System.currentTimeMillis(); } @Override public void invoke(Message value, Context context) throws Exception { System.out.println(value); String rk = value.id+"-"+value.ts; //创建put对象,并赋rk值 Put put = new Put(rk.getBytes()); // 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25 put.addColumn("cf1".getBytes(), "id".getBytes(), value.id.getBytes()); put.addColumn("cf1".getBytes(), "vals".getBytes(), value.vals.getBytes()); put.addColumn("cf1".getBytes(), "p".getBytes(), (value.p+"").getBytes()); put.addColumn("cf1".getBytes(), "ts".getBytes(), (value.ts+"").getBytes()); System.out.println("----------"); System.out.println(put); puts.add(put);// 添加put对象到list集合 //使用ProcessingTime long currentTime = System.currentTimeMillis(); System.out.println(currentTime - lastInvokeTime); //开始批次提交数据 if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) { //获取一个Hbase表 Table table = connection.getTable(TableName.valueOf(tableName)); table.put(puts);//批次提交 puts.clear(); lastInvokeTime = currentTime; table.close(); } } @Override public void close() throws Exception { connection.close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。