当前位置:   article > 正文

Flume自定义SyslogSink_spdlog自定义sink

spdlog自定义sink

需求:收集客户端app应用日志,发送到其他代理agent上514syslog端口进行日志收集采样。

技术组件 :flume-ng

版本:flume1.7

其他:由于flume本身默认没有syslog的sink,只有syslog的source,所以我们可以自己定义syslog sink

测试syslog的javaDemo,功能实现ip校验,发送多条syslog日志 ,直接java -jar运行即可

  1. import java.net.URLDecoder;
  2. import java.util.Date;
  3. import org.productivity.java.syslog4j.Syslog;
  4. import org.productivity.java.syslog4j.SyslogIF;
  5. /**
  6. * SysLog发送数据
  7. * @author yl3395017
  8. */
  9. public class SyslogSend{
  10. public static void main(String[] args){
  11. if(args.length<2){
  12. System.out.println("error params you should be input ip and sentitems");
  13. }
  14. if(ipCheck(args[0])){
  15. try {
  16. //获取syslog的操作类,使用udp协议。syslog支持"udp", "tcp", "unix_syslog", "unix_socket"协议
  17. SyslogIF syslog = Syslog.getInstance("udp");
  18. //设置syslog服务器端地址
  19. syslog.getConfig().setHost(args[0]);
  20. //设置syslog接收端口,默认514
  21. syslog.getConfig().setPort(514);
  22. //拼接syslog日志,这个日志是自己定义的,通常我们定义成符合公司规范的格式就行,方便查询。例如 操作时间:2014年8月1日 操作者ID:张三 等。信息就是一个字符串。
  23. StringBuffer buffer = new StringBuffer();
  24. buffer.append("operatedate:" + new Date().toString().substring(4, 20) + ";");
  25. buffer.append("operatetimeID:" + "test" + ";");
  26. buffer.append("operatetime:" + new Date()+ ";");
  27. buffer.append("type:" + "22"+ ";");
  28. buffer.append("action:" + "update" + ";");
  29. buffer.append("common:" + "test syslog");
  30. /* 发送信息到服务器,2表示日志级别 范围为0~7的数字编码,表示了事件的严重程度。0最高,7最低
  31. * syslog为每个事件赋予几个不同的优先级:
  32. LOG_EMERG:紧急情况,需要立即通知技术人员。
  33. LOG_ALERT:应该被立即改正的问题,如系统数据库被破坏,ISP连接丢失。
  34. LOG_CRIT:重要情况,如硬盘错误,备用连接丢失。
  35. LOG_ERR:错误,不是非常紧急,在一定时间内修复即可。
  36. LOG_WARNING:警告信息,不是错误,比如系统磁盘使用了85%等。
  37. LOG_NOTICE:不是错误情况,也不需要立即处理。
  38. LOG_INFO:情报信息,正常的系统消息,比如骚扰报告,带宽数据等,不需要处理。
  39. LOG_DEBUG:包含详细的开发情报的信息,通常只在调试一个程序时使用。
  40. */
  41. System.out.println(buffer.toString());
  42. for(int i = 0;i<=Integer.parseInt(args[1]);i++){
  43. syslog.log(0, URLDecoder.decode(buffer.toString()+i,"utf-8"));
  44. }
  45. } catch (Exception e) {
  46. }
  47. }else{
  48. System.out.println("error params you should be input right ip");
  49. }
  50. }
  51. public static boolean ipCheck(String text) {
  52. if (text != null && !text.isEmpty()) {
  53. // 定义正则表达式
  54. String regex = "^(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|[1-9])\\."
  55. + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\."
  56. + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\."
  57. + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)$";
  58. // 判断ip地址是否与正则表达式匹配
  59. if (text.matches(regex)) {
  60. return true;
  61. // 返回判断信息
  62. //return text + "\n是一个合法的IP地址!";
  63. } else {
  64. return false;
  65. // 返回判断信息
  66. //return text + "\n不是一个合法的IP地址!";
  67. }
  68. }
  69. return false;
  70. // 返回判断信息
  71. // return "请输入要验证的IP地址!";
  72. }
  73. }

修正flume源码,实现syslog sink

依赖:

/**
 * flume-ng-configuration-1.7.0.jar  flume-ng-core-1.7.0.jar flume-ng-sdk-1.7.0.jar

 * */

  1. import java.io.UnsupportedEncodingException;
  2. import java.net.URLDecoder;
  3. import org.apache.flume.Channel;
  4. import org.apache.flume.Context;
  5. import org.apache.flume.Event;
  6. import org.apache.flume.EventDeliveryException;
  7. import org.apache.flume.Transaction;
  8. import org.apache.flume.conf.Configurable;
  9. import org.apache.flume.sink.AbstractSink;
  10. import org.apache.log4j.Logger;
  11. import org.productivity.java.syslog4j.Syslog;
  12. import org.productivity.java.syslog4j.SyslogIF;
  13. /**
  14. * flume-ng-configuration-1.7.0.jar flume-ng-core-1.7.0.jar
  15. * flume-ng-sdk-1.7.0.jar
  16. * */
  17. public class SyslogSink extends AbstractSink implements Configurable {
  18. private static final Logger logger = Logger.getLogger(SyslogSink.class);
  19. private static final String PROP_KEY_HOST = "destination";
  20. private String hostip;
  21. private SyslogIF syslog;
  22. @Override
  23. public void configure(Context context) {
  24. hostip = context.getString(PROP_KEY_HOST);
  25. syslog = Syslog.getInstance("udp");
  26. }
  27. @Override
  28. public Status process() throws EventDeliveryException {
  29. Channel ch = getChannel();
  30. Transaction txn = ch.getTransaction();
  31. Event event = null;
  32. txn.begin();
  33. try {
  34. while (true) {
  35. event = ch.take();
  36. if (event != null) {
  37. break;
  38. }
  39. }
  40. logger.info("Get event.");
  41. String body = new String(event.getBody());
  42. logger.info("event.getBody()-----" + body);
  43. String res = body + ":" + System.currentTimeMillis() + "\r\n";
  44. SendSyslog(res);
  45. txn.commit();
  46. return Status.READY;
  47. } catch (Throwable th) {
  48. txn.rollback();
  49. if (th instanceof Error) {
  50. throw (Error) th;
  51. } else {
  52. throw new EventDeliveryException(th);
  53. }
  54. } finally {
  55. txn.close();
  56. }
  57. }
  58. private void SendSyslog(String message) throws UnsupportedEncodingException {
  59. // 设置syslog服务器端地址
  60. syslog.getConfig().setHost(hostip);
  61. // 设置syslog接收端口,默认514
  62. syslog.getConfig().setPort(514);
  63. syslog.log(0, URLDecoder.decode(message, "utf-8"));
  64. }
  65. }
打包放到flume/bin下即可

config配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 514
a1.sources.r1.host = 0.0.0.0

#自定义类型
a1.sinks.k1.type = com.neusoft.utils.SyslogSink
a1.sinks.k1.destination = 10.176.63.103

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume即可

/soc/flume/apache-flume-1.7.0-bin/bin/flume-ng agent -c /soc/flume/apache-flume-1.7.0-bin/conf -f /soc/flume/apache-flume-1.7.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/640258
推荐阅读
相关标签
  

闽ICP备14008679号