当前位置:   article > 正文

Hadoop的log4j审计日志文件_hdfs.audit.log.maxbackupindex

hdfs.audit.log.maxbackupindex

      自定义修改hadoop/conf/log4j.properties

       hdfs审计日志(Auditlog)记录了用户针对hdfs的所有操作,详细信息包括操作成功与否、用户名称、客户机地址、操作命令、操作的目录等。对于用户的每一个操作,namenode都会将这些信息以key-value对的形式组织成固定格式的一条日志,然后记录到audit.log文件中。通过审计日志,我们可以实时查看hdfs的各种操作状况、可以追踪各种误操作、可以做一些指标监控等等。

       hdfs的审计日志功能是可插拔的,用户可以通过实现默认接口扩展出满足自己所需的插件来替换hdfs默认提供的审计日志功能,或者与之并用。

启用审计日志

        如果仅仅只启用默认的AuditLogger(DefaultAuditLogger),则在log4j.properties添加如下配置(hdfs.audit.logger必须配置为INFO级别)即可,审计日志会与namenode的系统日志独立开来保存,log4j.appender.RFAAUDIT.File可配置保存的位置及文件。 FSNamesystem根据log4j.properties中hdfs.audit.logger是否为INFO,以及是否配置了DefaultAuditLogger之外的其他AuditLogger,来决定是否启用审计日志功能。

  1. #
  2. # hdfs audit logging
  3. #
  4. hdfs.audit.logger=INFO,NullAppender
  5. hdfs.audit.log.maxfilesize=256MB
  6. hdfs.audit.log.maxbackupindex=20
  7. log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
  8. log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
  9. log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
  10. log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
  11. log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
  12. log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
  13. log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
  14. log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}

审计日志的接口及实现

Namenode开放了AuditLogger接口,并定义抽象类HdfsAuditLogger 实现AuditLogger,默认提供实现类DefaultAuditLogger。构造FSNamesystem时通过initAuditLoggers(Configuration conf)方法创建AuditLogger列表。在记录用户操作时,会将操作信息逐一传给列表中的每一个AuditLogger,由其做对应的审计处理。

       通过实现Auditloger接口或者扩展HdfsAuditLogger类,用户可以实现自己的AuditLogger来满足所需,例如有针对性的记录审计日志(当集群、访问量上规模之后疯狂刷日志必然对namenode有影响,有针对性的记录有必要的日志是缓解此状况的一种可选方案)、扩展功能、将日志接入实时系统做实时分析或监控等。用户通过配置项dfs.namenode.audit.loggers在hdfs-site.xml中配置Auditloger的实现类,多个实现可以通过逗号分开,更改配置后重启namenode接口生效。FSNamesystem的initAuditLoggers(Configuration conf)方法通过该配置项加载并实例化实现类,初始化后存入集合。如果用户没有配置,那么默认使用DefaultAuditLogger。如果启动了nntop功能,还会使用TopAuditLogger。

FSNamesystem 初始化所有的AuditLogger:

  1. 1private List<AuditLogger> initAuditLoggers(Configuration conf) {
  2. 2 // Initialize the custom access loggers if configured.
  3. 3 //DFS_NAMENODE_AUDIT_LOGGERS_KEY=dfs.namenode.audit.loggers
  4. 4 Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
  5. 5 List<AuditLogger> auditLoggers = Lists.newArrayList();
  6. 6 if (alClasses != null && !alClasses.isEmpty()) {
  7. 7 for (String className : alClasses) {
  8. 8 try {
  9. 9 AuditLogger logger;
  10. 10 if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {
  11. 11 logger = new DefaultAuditLogger();
  12. 12 } else {
  13. 13 logger = (AuditLogger) Class.forName(className).newInstance();
  14. 14 }
  15. 15 logger.initialize(conf);
  16. 16 auditLoggers.add(logger);
  17. 17 } catch (RuntimeException re) {
  18. 18 throw re;
  19. 19 } catch (Exception e) {
  20. 20 throw new RuntimeException(e);
  21. 21 }
  22. 22 }
  23. 23 }
  24. 24
  25. 25 // Make sure there is at least one logger installed.
  26. 26 // 如果用户没有提供AuditLoggers,则默认使用DefaultAuditLogger
  27. 27 if (auditLoggers.isEmpty()) {
  28. 28 auditLoggers.add(new DefaultAuditLogger());
  29. 29 }
  30. 30
  31. 31 // Add audit logger to calculate top users
  32. 32 // 默认topConf.isEnabled是开启的,用于指标聚合、上报
  33. 33 // TopAuditLogger类似 top命令
  34. 34 if (topConf.isEnabled) {
  35. 35 topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
  36. 36 auditLoggers.add(new TopAuditLogger(topMetrics));
  37. 37 }
  38. 38
  39. 39 return Collections.unmodifiableList(auditLoggers);
  40. 40 }

DefaultAuditLogger记录日志:

  1. 1@Override
  2. 2 public void logAuditEvent(boolean succeeded, String userName,
  3. 3 InetAddress addr, String cmd, String src, String dst,
  4. 4 FileStatus status, UserGroupInformation ugi,
  5. 5 DelegationTokenSecretManager dtSecretManager) {
  6. 6 if (auditLog.isInfoEnabled()) {
  7. 7 final StringBuilder sb = auditBuffer.get();
  8. 8 sb.setLength(0);
  9. 9 sb.append("allowed=").append(succeeded).append("\t");
  10. 10 sb.append("ugi=").append(userName).append("\t");
  11. 11 sb.append("ip=").append(addr).append("\t");
  12. 12 sb.append("cmd=").append(cmd).append("\t");
  13. 13 sb.append("src=").append(src).append("\t");
  14. 14 sb.append("dst=").append(dst).append("\t");
  15. 15 if (null == status) {
  16. 16 sb.append("perm=null");
  17. 17 } else {
  18. 18 sb.append("perm=");
  19. 19 sb.append(status.getOwner()).append(":");
  20. 20 sb.append(status.getGroup()).append(":");
  21. 21 sb.append(status.getPermission());
  22. 22 }
  23. 23 if (logTokenTrackingId) {
  24. 24 sb.append("\t").append("trackingId=");
  25. 25 String trackingId = null;
  26. 26 if (ugi != null && dtSecretManager != null
  27. 27 && ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
  28. 28 for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
  29. 29 if (tid instanceof DelegationTokenIdentifier) {
  30. 30 DelegationTokenIdentifier dtid =
  31. 31 (DelegationTokenIdentifier)tid;
  32. 32 trackingId = dtSecretManager.getTokenTrackingId(dtid);
  33. 33 break;
  34. 34 }
  35. 35 }
  36. 36 }
  37. 37 sb.append(trackingId);
  38. 38 }
  39. 39 sb.append("\t").append("proto=");
  40. 40 sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
  41. 41 logAuditMessage(sb.toString());
  42. 42 }
  43. 43 }
  44. 44
  45. 45 public void logAuditMessage(String message) {
  46. 46 auditLog.info(message);
  47. 47 }

审计过程

       客户端对hdfs的所有操作,不管成功与否都会由FSNamesystem记录下。以删除操作为例,FSNamesystem在正常删除给定src后调用logAuditEvent(true, "delete", src)记录此次成功的delete操作,如果删除失败抛出异常,则调用logAuditEvent(false, "delete", src)记录此次失败的delete操作。

  1. 1boolean delete(String src, boolean recursive, boolean logRetryCache)
  2. 2 throws IOException {
  3. 3 waitForLoadingFSImage();
  4. 4 BlocksMapUpdateInfo toRemovedBlocks = null;
  5. 5 writeLock();
  6. 6 boolean ret = false;
  7. 7 try {
  8. 8 checkOperation(OperationCategory.WRITE);
  9. 9 checkNameNodeSafeMode("Cannot delete " + src);
  10. 10 toRemovedBlocks = FSDirDeleteOp.delete(
  11. 11 this, src, recursive, logRetryCache);
  12. 12 ret = toRemovedBlocks != null;
  13. 13 } catch (AccessControlException e) {
  14. 14 logAuditEvent(false, "delete", src);
  15. 15 throw e;
  16. 16 } finally {
  17. 17 writeUnlock();
  18. 18 }
  19. 19 getEditLog().logSync();
  20. 20 if (toRemovedBlocks != null) {
  21. 21 removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
  22. 22 }
  23. 23 logAuditEvent(true, "delete", src);
  24. 24 return ret;
  25. 25 }
  26. 26
  27. 27
  28. 28
  29. 29//判断是否是外部调用,只对rpc调用和webHdfs调用做审计
  30. 30 boolean isExternalInvocation() {
  31. 31 return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
  32. 32 }
  33. 33
  34. 34 //判断是否启用审计日志功能
  35. 35 public boolean isAuditEnabled() {
  36. 36 return !isDefaultAuditLogger || auditLog.isInfoEnabled();
  37. 37 }
  38. 38
  39. 39 //succeeded:操作是否成功 cmd:操作命令 src:操作对象
  40. 40 private void logAuditEvent(boolean succeeded, String cmd, String src)
  41. 41 throws IOException {
  42. 42 logAuditEvent(succeeded, cmd, src, null, null);
  43. 43 }
  44. 44
  45. 45 private void logAuditEvent(boolean succeeded, String cmd, String src,
  46. 46 String dst, HdfsFileStatus stat) throws IOException {
  47. 47 if (isAuditEnabled() && isExternalInvocation()) {
  48. 48 logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
  49. 49 cmd, src, dst, stat);
  50. 50 }
  51. 51 }
  52. 52
  53. 53 //获取操作对象的信息,调用所有的auditloger 做审计
  54. 54 private void logAuditEvent(boolean succeeded,
  55. 55 UserGroupInformation ugi, InetAddress addr, String cmd, String src,
  56. 56 String dst, HdfsFileStatus stat) {
  57. 57 FileStatus status = null;
  58. 58 if (stat != null) {
  59. 59 Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
  60. 60 Path path = dst != null ? new Path(dst) : new Path(src);
  61. 61 status = new FileStatus(stat.getLen(), stat.isDir(),
  62. 62 stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
  63. 63 stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
  64. 64 stat.getGroup(), symlink, path);
  65. 65 }
  66. 66 for (AuditLogger logger : auditLoggers) {
  67. 67 if (logger instanceof HdfsAuditLogger) {
  68. 68 HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
  69. 69 hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
  70. 70 status, ugi, dtSecretManager);
  71. 71 } else {
  72. 72 logger.logAuditEvent(succeeded, ugi.toString(), addr,
  73. 73 cmd, src, dst, status);
  74. 74 }
  75. 75 }
  76. 76 }

审计日志接入实时系统的方法

  • 方法1:扩展Log4J的appender,由appender将日志发送到kafka。
  • 方法2:直接让kafka的producer读取日志文件。

hadoop日志级别设置

在hadoop/bin/ hadoop-daemon.sh文件下

export HADOOP_ROOT_LOGGER="DEBUG,DRFA"

自定义日志

目标:将需要的信息写入自己指定的独立的日志中。
需求:这次只是一个尝试,在DFSClient中,将部分内容写入指定的日志文件中。在客户端读取HDFS数据时,将读的blockID写入文件。
步骤:
1、修改hadoop/conf/log4j.properties文件。在文件末尾添加如下内容:
#为写日志的操作取个名字,MyDFSClient。用来在DFSClient中获取该日志的实例。并指定输出方式为自定义的OUT
log4j.logger.MyDFSClient=DEBUG,OUT
#设置OUT的输出方式为输出到文件
log4j.appender.OUT=org.apache.log4j.FileAppender
#设置文件路径
log4j.appender.OUT.File=${hadoop.log.dir}/DFSClient.log
#设置文件的布局
log4j.appender.OUT.layout=org.apache.log4j.PatternLayout
#设置文件的格式
log4j.appender.OUT.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
#设置该日志操作不与父类日志操作重叠
log4j.additivity.MyDFSClient=false
2、保存该文件,复制到集群各个节点的hadoop/conf目录下,替换原有的文件。
3、修改DFSClient类
这里只是简单的为了验证这个过程的正确性,以后还回加入更有意义的日志内容。
首先在DFSClient类中声明一个LOG实例:
public static final Log myLOG = LogFactory.getLog("MyDFSClient");
在read(byte buf[], int off, int len)函数中,添加如下代码:
myLOG.info("Read Block!!!!");
if(currentBlock!=null)
myLOG.info("Read block: "+currentBlock.getBlockId());

4、重新启动hadoop。

5、这里使用dfs命令进行测试。
$bin/hadoop dfs -cat /user/XXX/out/part-r-00000
可以看到文件part-r-00000的内容输出到屏幕。这时在/hadoop/logs/DFSClient.log文件中,可以看到刚才在类中记录的日志。验证成功。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/204796
推荐阅读
相关标签
  

闽ICP备14008679号