当前位置:   article > 正文

flink使用hanlp进行情感分析_hanlp 情绪识别

hanlp 情绪识别

依赖

  1. <properties>
  2. <flink.version>1.12.5</flink.version>
  3. <scala.version>2.12</scala.version>
  4. </properties>
  5. <dependencies>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-java</artifactId>
  9. <version>${flink.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-streaming-java_${scala.version}</artifactId>
  14. <version>${flink.version}</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-clients_${scala.version}</artifactId>
  19. <version>${flink.version}</version>
  20. </dependency>
  21. <!--hanlp-->
  22. <dependency>
  23. <groupId>com.hankcs</groupId>
  24. <artifactId>hanlp</artifactId>
  25. <version>portable-1.7.6</version>
  26. </dependency>
  27. </dependencies>

驯练

negDir和posDir是分别存放负面语料和正面语料的文件夹,里面的语料是txt文件

  1. public class NlpTrain {
  2. public static void main(String[] args) {
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. String negDir="E:\\nlp\\sample\\neg";
  5. String posDir="E:\\nlp\\sample\\pos";
  6. DataStream<TranObj> negData=env.readFile(FileRead.getDirTextInputFormat(negDir),negDir)
  7. .assignTimestampsAndWatermarks(new WatermarkStrageWithTimestamp<>())
  8. .filter(new FilterFunction<String>() {
  9. @Override
  10. public boolean filter(String s) throws Exception {
  11. return s.length()>0;
  12. }
  13. })
  14. .map(new MapFunction<String, TranObj>() {
  15. @Override
  16. public TranObj map(String s) throws Exception {
  17. return new TranObj("负向", s);
  18. }
  19. });
  20. DataStream<TranObj> posData=env.readFile(FileRead.getDirTextInputFormat(posDir),posDir)
  21. .assignTimestampsAndWatermarks(new WatermarkStrageWithTimestamp<>())
  22. .filter(new FilterFunction<String>() {
  23. @Override
  24. public boolean filter(String s) throws Exception {
  25. return s.length()>0;
  26. }
  27. })
  28. .map(new MapFunction<String, TranObj>() {
  29. @Override
  30. public TranObj map(String s) throws Exception {
  31. return new TranObj("正向", s);
  32. }
  33. });;
  34. negData.union(posData).keyBy(TranObj::getType)
  35. .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
  36. .aggregate(new AggregateFunction<TranObj, List<TranObj>, Map<String, List<String>>>() {
  37. @Override
  38. public List<TranObj> createAccumulator() {
  39. return new ArrayList<>();
  40. }
  41. @Override
  42. public List<TranObj> add(TranObj tranObj, List<TranObj> tranObjs) {
  43. tranObjs.add(tranObj);
  44. return tranObjs;
  45. }
  46. @Override
  47. public Map<String, List<String>> getResult(List<TranObj> tranObjs) {
  48. List<String> list=new ArrayList<>();
  49. String key=tranObjs.get(0).getType();
  50. for (TranObj obj : tranObjs){
  51. list.add(obj.getSample());
  52. }
  53. Map<String, List<String>> map=new HashMap<>();
  54. map.put(key,list);
  55. return map;
  56. }
  57. @Override
  58. public List<TranObj> merge(List<TranObj> tranObjs, List<TranObj> acc1) {
  59. tranObjs.addAll(acc1);
  60. return tranObjs;
  61. }
  62. }).windowAll(EventTimeSessionWindows.withGap(Time.minutes(1)))
  63. .reduce(new ReduceFunction<Map<String, List<String>>>() {
  64. @Override
  65. public Map<String, List<String>> reduce(Map<String, List<String>> map1, Map<String, List<String>> map2) throws Exception {
  66. for (String key:map1.keySet()){
  67. if (map2.containsKey(key)){
  68. map1.get(key).addAll(map2.get(key));
  69. }
  70. }
  71. for (String key:map2.keySet()){
  72. if (!map1.containsKey(key)){
  73. map1.put(key,map2.get(key));
  74. }
  75. }
  76. return map1;
  77. }
  78. }).map(new MapFunction<Map<String, List<String>>, Map<String, String[]>>() {
  79. @Override
  80. public Map<String, String[]> map(Map<String, List<String>> stringListMap) throws Exception {
  81. Map<String, String[]> tranMap=new HashMap<>();
  82. for (String k:stringListMap.keySet()){
  83. String[] arr=new String[stringListMap.get(k).size()];
  84. tranMap.put(k,stringListMap.get(k).toArray(arr));
  85. }
  86. HanlpModel.trainNaiveBayesModel(tranMap, "E:\\log\\test.ser");
  87. return tranMap;
  88. }
  89. });
  90. try {
  91. env.execute();
  92. } catch (Exception e) {
  93. e.printStackTrace();
  94. }
  95. }
  96. }

 HanlpModel.java

  1. public class HanlpModel {
  2. public static void trainNaiveBayesModel(Map<String, String[]> map, String path) {
  3. IClassifier classifier = new NaiveBayesClassifier();
  4. classifier.train(map);
  5. NaiveBayesModel model = (NaiveBayesModel) classifier.getModel();
  6. IOUtil.saveObjectTo(model, path);
  7. }
  8. public static NaiveBayesModel getNaiveBayesModel(String path){
  9. return (NaiveBayesModel)IOUtil.readObjectFrom(path);
  10. }
  11. }

WatermarkStrageWithTimestamp.java

  1. public class WatermarkStrageWithTimestamp<T> implements WatermarkStrategy<T> {
  2. @Override
  3. public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  4. return new BoundedOutOfOrdernessWatermarks<>(Duration.ofSeconds(10));
  5. }
  6. @Override
  7. public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  8. return new TimestampAssigner<T>() {
  9. @Override
  10. public long extractTimestamp(T t, long l) {
  11. return System.currentTimeMillis();
  12. }
  13. };
  14. }
  15. }

FileRead.java 

  1. public class FileRead {
  2. public static TextInputFormat getDirTextInputFormat(String dir){
  3. Path path = new Path(dir);
  4. Configuration configuration = new Configuration();
  5. configuration.setBoolean("recursive.file.enumeration", true);
  6. TextInputFormat textInputFormat = new TextInputFormat(path);
  7. textInputFormat.supportsMultiPaths();
  8. textInputFormat.configure(configuration);
  9. textInputFormat.setCharsetName("UTF-8");
  10. return textInputFormat;
  11. }
  12. }

测试

  1. public class Test {
  2. public static void main(String[] args) {
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.readTextFile("E:\\log\\test.txt").filter(new FilterFunction<String>() {
  5. @Override
  6. public boolean filter(String input) throws Exception {
  7. return input.length()>0;
  8. }
  9. }).map(new MapFunction<String, DataObj>() {
  10. @Override
  11. public DataObj map(String input) throws Exception {
  12. return new DataObj(input);
  13. }
  14. }).map(new MapFunction<DataObj, String>() {
  15. @Override
  16. public String map(DataObj dataObj) throws Exception {
  17. IClassifier classifier = new NaiveBayesClassifier(HanlpModel.getNaiveBayesModel("E:\\log\\test.ser"));
  18. return classifier.classify(dataObj.getContent())+":"+dataObj.getContent();
  19. }
  20. }).print();
  21. try {
  22. env.execute();
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/428047
推荐阅读
相关标签
  

闽ICP备14008679号