赞
踩
需求:收集客户端app应用日志,发送到其他代理agent上514syslog端口进行日志收集采样。
技术组件 :flume-ng
版本:flume1.7
其他:由于flume本身默认没有syslog的sink,只有syslog的source,所以我们可以自己定义syslog sink
测试syslog的javaDemo,功能实现ip校验,发送多条syslog日志 ,直接java -jar运行即可
- import java.net.URLDecoder;
- import java.util.Date;
-
- import org.productivity.java.syslog4j.Syslog;
- import org.productivity.java.syslog4j.SyslogIF;
-
- /**
- * SysLog发送数据
- * @author yl3395017
- */
- public class SyslogSend{
-
- public static void main(String[] args){
- if(args.length<2){
- System.out.println("error params you should be input ip and sentitems");
- }
- if(ipCheck(args[0])){
- try {
- //获取syslog的操作类,使用udp协议。syslog支持"udp", "tcp", "unix_syslog", "unix_socket"协议
- SyslogIF syslog = Syslog.getInstance("udp");
- //设置syslog服务器端地址
- syslog.getConfig().setHost(args[0]);
- //设置syslog接收端口,默认514
- syslog.getConfig().setPort(514);
- //拼接syslog日志,这个日志是自己定义的,通常我们定义成符合公司规范的格式就行,方便查询。例如 操作时间:2014年8月1日 操作者ID:张三 等。信息就是一个字符串。
- StringBuffer buffer = new StringBuffer();
- buffer.append("operatedate:" + new Date().toString().substring(4, 20) + ";");
- buffer.append("operatetimeID:" + "test" + ";");
- buffer.append("operatetime:" + new Date()+ ";");
- buffer.append("type:" + "22"+ ";");
- buffer.append("action:" + "update" + ";");
- buffer.append("common:" + "test syslog");
- /* 发送信息到服务器,2表示日志级别 范围为0~7的数字编码,表示了事件的严重程度。0最高,7最低
- * syslog为每个事件赋予几个不同的优先级:
- LOG_EMERG:紧急情况,需要立即通知技术人员。
- LOG_ALERT:应该被立即改正的问题,如系统数据库被破坏,ISP连接丢失。
- LOG_CRIT:重要情况,如硬盘错误,备用连接丢失。
- LOG_ERR:错误,不是非常紧急,在一定时间内修复即可。
- LOG_WARNING:警告信息,不是错误,比如系统磁盘使用了85%等。
- LOG_NOTICE:不是错误情况,也不需要立即处理。
- LOG_INFO:情报信息,正常的系统消息,比如骚扰报告,带宽数据等,不需要处理。
- LOG_DEBUG:包含详细的开发情报的信息,通常只在调试一个程序时使用。
- */
- System.out.println(buffer.toString());
- for(int i = 0;i<=Integer.parseInt(args[1]);i++){
- syslog.log(0, URLDecoder.decode(buffer.toString()+i,"utf-8"));
- }
-
- } catch (Exception e) {
- }
- }else{
- System.out.println("error params you should be input right ip");
-
- }
- }
-
- public static boolean ipCheck(String text) {
- if (text != null && !text.isEmpty()) {
- // 定义正则表达式
- String regex = "^(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|[1-9])\\."
- + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\."
- + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\."
- + "(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)$";
- // 判断ip地址是否与正则表达式匹配
- if (text.matches(regex)) {
- return true;
- // 返回判断信息
- //return text + "\n是一个合法的IP地址!";
- } else {
- return false;
- // 返回判断信息
- //return text + "\n不是一个合法的IP地址!";
- }
- }
- return false;
- // 返回判断信息
- // return "请输入要验证的IP地址!";
- }
- }
依赖:
/**
* flume-ng-configuration-1.7.0.jar flume-ng-core-1.7.0.jar flume-ng-sdk-1.7.0.jar
* */
- import java.io.UnsupportedEncodingException;
- import java.net.URLDecoder;
- import org.apache.flume.Channel;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.Transaction;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.sink.AbstractSink;
- import org.apache.log4j.Logger;
- import org.productivity.java.syslog4j.Syslog;
- import org.productivity.java.syslog4j.SyslogIF;
-
-
- /**
- * flume-ng-configuration-1.7.0.jar flume-ng-core-1.7.0.jar
- * flume-ng-sdk-1.7.0.jar
- * */
- public class SyslogSink extends AbstractSink implements Configurable {
-
- private static final Logger logger = Logger.getLogger(SyslogSink.class);
- private static final String PROP_KEY_HOST = "destination";
- private String hostip;
- private SyslogIF syslog;
-
- @Override
- public void configure(Context context) {
- hostip = context.getString(PROP_KEY_HOST);
- syslog = Syslog.getInstance("udp");
- }
-
- @Override
- public Status process() throws EventDeliveryException {
- Channel ch = getChannel();
- Transaction txn = ch.getTransaction();
- Event event = null;
- txn.begin();
- try {
- while (true) {
- event = ch.take();
- if (event != null) {
- break;
- }
- }
- logger.info("Get event.");
- String body = new String(event.getBody());
- logger.info("event.getBody()-----" + body);
- String res = body + ":" + System.currentTimeMillis() + "\r\n";
- SendSyslog(res);
- txn.commit();
- return Status.READY;
- } catch (Throwable th) {
- txn.rollback();
- if (th instanceof Error) {
- throw (Error) th;
- } else {
- throw new EventDeliveryException(th);
- }
-
- } finally {
- txn.close();
- }
- }
-
- private void SendSyslog(String message) throws UnsupportedEncodingException {
- // 设置syslog服务器端地址
- syslog.getConfig().setHost(hostip);
- // 设置syslog接收端口,默认514
- syslog.getConfig().setPort(514);
- syslog.log(0, URLDecoder.decode(message, "utf-8"));
- }
-
- }
打包放到flume/bin下即可
config配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 514
a1.sources.r1.host = 0.0.0.0
#自定义类型
a1.sinks.k1.type = com.neusoft.utils.SyslogSink
a1.sinks.k1.destination = 10.176.63.103
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
/soc/flume/apache-flume-1.7.0-bin/bin/flume-ng agent -c /soc/flume/apache-flume-1.7.0-bin/conf -f /soc/flume/apache-flume-1.7.0-bin/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。