赞
踩
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
</dependency>
package com.liming.app.udp; import com.liming.app.common.cache.CacheSingleton; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.Filter; import org.springframework.integration.annotation.Router; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.ip.dsl.Udp; import org.springframework.integration.ip.udp.UnicastSendingMessageHandler; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import java.io.UnsupportedEncodingException; import java.util.Map; import static com.liming.app.modbusTcp.HexUtil.floatArrToIntArr; import static com.liming.app.modbusTcp.HexUtil.stringToFloatArr; @Slf4j @Configuration public class UdpClientConfig { @Value("${udp.listeningPort}") public Integer listeningPort; @Value("${udp.sendingPort}") public Integer sendingPort; @Value("${udp.sendingIp}") public String sendingIp; /** * UDP消息接收服务 */ @Bean public IntegrationFlow integrationFlow() { log.info("UDP服务启动成功,端口号为: {}", listeningPort); return IntegrationFlows.from(Udp.inboundAdapter(listeningPort)).channel("udpChannel").get(); } /** * 转换器 */ @Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter") public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) throws UnsupportedEncodingException { String message = new String(payload); // todo 进行数据转换 return message; } /** * 过滤器 */ @Filter(inputChannel = "udpFilter", outputChannel = "udpRouter") public boolean filter(String message, @Headers Map<String, Object> headers) { // 获取来源Id String id = headers.get("id").toString(); // 获取来源IP,可以进行IP过滤 String ip = headers.get("ip_address").toString(); // 获取来源Port String port = headers.get("ip_port").toString(); // todo 信息数据过滤 // if (true) { // // 没有-的数据会被过滤 // return false; // } return true; } /** * 路由分发处理器:可以进行分发消息被那个处理器进行处理 */ @Router(inputChannel = "udpRouter") public String router(String message, @Headers Map<String, Object> headers) { // 获取来源Id String id = headers.get("id").toString(); // 获取来源IP,可以进行IP过滤 String ip = headers.get("ip_address").toString(); // 获取来源Port String port = headers.get("ip_port").toString(); // todo 筛选,走那个处理器 if (true) { return "udpHandle2"; } return "udpHandle1"; } /** * 最终处理器1 */ @ServiceActivator(inputChannel = "udpHandle1") public void udpMessageHandle(String message) throws Exception { // todo 可以进行异步处理 log.info("message:" + message); } /** * 最终处理器2 */ @ServiceActivator(inputChannel = "udpHandle2") public void udpMessageHandle2(String message) throws Exception { log.info("UDP2:" + message); } public static void main(String[] args) { String str = "701,50.0486,49.8133,500.486,157.029,1033.11,10.046,24.2811"; }
//适用于upd发生地址不确定的情况 @Slf4j @RestController @RequestMapping("/udp") public class UdpController { @GetMapping("/start") public void start(){ String message = "123"; byte[] bytes = message.getBytes(); for (int i = 0; i < 10000; i++) { InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 11005); DatagramPacket datagramPacket = null; try (DatagramSocket datagramSocket = new DatagramSocket()) { datagramPacket = new DatagramPacket(bytes, bytes.length, socketAddress); datagramSocket.send(datagramPacket); datagramSocket.disconnect(); } catch (IOException e) { log.error(e.getMessage(), e); } } } }
@Slf4j @Configuration public class UdpClientConfig { @Value("${udp.sendingPort}") public Integer sendingPort; @Value("${udp.sendingIp}") public String sendingIp; /** * 发送消息配置 * 指定ip和端口 * @return */ @Bean(name = "unicastSendingMessageHandler") public UnicastSendingMessageHandler sending(){ return new UnicastSendingMessageHandler(sendingIp, sendingPort,true); } }
package com.liming.app.udp; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.ip.udp.UnicastSendingMessageHandler; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; @Slf4j @RestController @RequestMapping("/udp") public class UdpController { @Autowired private UnicastSendingMessageHandler unicastSendingMessageHandler; @GetMapping("/send") public void send(@RequestParam(value = "message") String message){ for (int i = 0; i < 10000; i++) { unicastSendingMessageHandler.handleMessage(MessageBuilder.withPayload(message).build()); } } @GetMapping("/start") public void start(){ String message = "123"; byte[] bytes = message.getBytes(); for (int i = 0; i < 10000; i++) { InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 11005); DatagramPacket datagramPacket = null; try (DatagramSocket datagramSocket = new DatagramSocket()) { datagramPacket = new DatagramPacket(bytes, bytes.length, socketAddress); datagramSocket.send(datagramPacket); datagramSocket.disconnect(); } catch (IOException e) { log.error(e.getMessage(), e); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。