pip install transformers
pip install torch
pip install torch torchvision
conda install pytorch torchvision torchaudio -c pytorch
from sklearn.metrics import f1_score, accuracy_score from transformers import BertTokenizer, BertForSequenceClassification import numpy as np from sklearn.linear_model import LogisticRegression import transformers from transformers import BertTokenizer, BertForSequenceClassification import torch from collections import defaultdict #from hmmlearn import hmm import math import operator import numpy as np import nltk from nltk.stem import PorterStemmer from nltk.tokenize import word_tokenize import string import os import math from scipy.stats import chi2_contingency import pandas as pd import glob import ast import json import csv import pandas as pd import re from sklearn.feature_extraction.text import CountVectorizer from nltk.corpus import stopwords from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression # create a csv that store the related information from tain or validation(depends on the main part) def createCsv(rout,csv_name): test=[] header = ['id', 'multi-author', 'changes','para_author','paragraph'] rout_txt=glob.glob(rout+'/*.txt') rout_json=glob.glob(rout+'/*.json') for n in range(len(rout_txt)): for m in range(len(rout_json)): if(rout_txt[n].split('.')[0].split('-')[-1]==rout_json[m].split('.')[0].split('-')[-1]): with open(rout_txt[n],'r', encoding='utf-8') as f1: print(rout_txt[n]) paragraph=f1.readlines() with open(rout_json[m], 'r', encoding='utf-8') as f2: content = json.load(f2) uniqId=rout_json[m].split('.')[0].split('-')[-1] multiauthor=content['multi-author'] changes=content['changes'] para_author=content['paragraph-authors'] csvcontent=(uniqId,multiauthor,changes,para_author,paragraph) test.append(csvcontent) with open(csv_name, 'w', encoding='utf-8',newline='') as file_obj1: writer = csv.writer(file_obj1) writer.writerow(header) writer.writerows(test) def preprocessing(s): words = s.translate(str.maketrans('','',string.punctuation)) #remove the content punctuation ps = PorterStemmer() word = word_tokenize(ps.stem(words)) return word # stremming all the words def preanalysis(text): # Tokenize text into words tokens = nltk.word_tokenize(text) stop_words = set(stopwords.words('english')) # Remove stop words from tokenized text filtered_tokens = [word for word in tokens if word.lower() not in stop_words] stemmer = PorterStemmer() stemmed_tokens = [stemmer.stem(word) for word in filtered_tokens] return stemmed_tokens # using google bert tokenizerchange = transformers.BertTokenizer.from_pretrained('bert-base-uncased') modelchange = transformers.BertModel.from_pretrained('bert-base-uncased') # change the format of changes so that it could matched to train def forchanges(changes): cleaned_list = [s.strip("[],' ") for s in changes if s not in [",", " "]] cleaned_list.pop(0) del cleaned_list[-1] cleaned_list.insert(0,1) return cleaned_list def forchanges1(changes): cleaned_list = [s.strip("[],' ") for s in changes if s not in [",", " "]] return cleaned_list # Define a function to predict the author of new paragraphs def predict_author(texts, tokenizer, model): predicted_authors = [] for text in texts: # Tokenize the input text and convert the labels to PyTorch tensors tokenized_text = tokenizer(text, padding=True, truncation=True, return_tensors='pt') # Forward pass outputs = model(**tokenized_text) # Get the predicted author predicted_author = torch.argmax(outputs.logits).item() predicted_authors.append(predicted_author) return predicted_authors def extract_features(paragraphs): encoded_inputs = tokenizerchange(paragraphs, padding=True, truncation=True, return_tensors='pt') with torch.no_grad(): outputs = modelchange(**encoded_inputs) embeddings = outputs.last_hidden_state.mean(dim=1) return embeddings.numpy() if __name__ == "__main__": rout1 = "data/train" csv_name1 = "train_test.csv" createCsv(rout1, csv_name1) counter=0 vectorizer1 = CountVectorizer() #create a new csv file to store real results and predicted results with open('test.csv', mode='w', newline='') as result: writer = csv.writer(result) #create a new csv file to store all f1 and accuracy with open('test_analysis.csv', mode='w', newline='') as result1: f1_writer = csv.writer(result1) writer.writerow(['id', 'real-multiauthor','pre-multiauthor','real-changes','pre-changes','real-para_author']) f1_writer.writerow(['id', 'multi-accuracy','changes-f1score','changes-accuracy','para-f1score','para-accuracy']) with open ('validation_test.csv',"r", encoding='utf-8')as csvFile: rows=csv.reader(csvFile) for row in rows: if(counter>0): sentsplit=row[-1].split('\\n') if len(sentsplit) <= 10: sentsplit1= sentsplit else: sentsplit1 = sentsplit.pop() # load the content inside csv authors = row[3] changes = row[2] authors1 = ast.literal_eval(authors) multiple = row[1] filename = row[0] if int(filename) not in [71,301,340,642,700,1752, 1823, 2019, 2021, 2022, 2096] and int(filename) <= 1000: #print("filename") #print(filename) print("filename") print(filename) features = extract_features(sentsplit1) cleaned_list = forchanges(changes) #print("correct changes") #print((cleaned_list)) # Train a logistic regression model to predict the author of a document based on its BERT embeddings try: clf1 = LogisticRegression() clf1.fit(features, cleaned_list) test_embedding1 = extract_features(sentsplit1) predicted_changes1 = clf1.predict(test_embedding1) sumchange= sum(int(x) for x in predicted_changes1) #criteria = predicted_changes1.count('1') print('Number of segments with style changes:', sumchange) if sumchange == 1: multiauthor = 0 else: multiauthor = 1 #print("predict multiauthor") #print(multiauthor) #print("authors") #print(multiple) if int(multiauthor) == int(multiple): multiauthorf1 = 1 print(f"F1 score of multiauthor: {multiauthorf1:.2f}") else: multiauthorf1 = 0 print(f"F1 score of multiauthor: {multiauthorf1:.2f}") changesf1 = f1_score(cleaned_list, predicted_changes1,pos_label='1') changesac = accuracy_score(cleaned_list, predicted_changes1) #print(f"F1 score of change: {changesf1:.2f}") except Exception as e: print(f"Error occurred: {e}") #changesf1 = (1/len(sentsplit1)) #print(f"F1 score of change: {changesf1:.2f}") multiauthor = 1 #print("multiauthor") #print(multiauthor) #print("authors") #print(multiple) if int(multiauthor) == int(multiple): multiauthorf1 = 1 print(f"F1 score of multiauthor: {multiauthorf1:.2f}") else: multiauthorf1 = 0 print(f"F1 score of multiauthor: {multiauthorf1:.2f}") changesf1 = 0 changesac = (1/len(sentsplit1)) num_authors = (len(sentsplit1)) # load pre-trained BERT model and tokenizer model_name = 'bert-base-uncased' tokenizer = BertTokenizer.from_pretrained(model_name) model = BertForSequenceClassification.from_pretrained(model_name, num_labels=num_authors) # map paragraph indices to author indices author_mapping = authors1 if len(author_mapping) < len(sentsplit1): author_mapping += [author_mapping[-1]] * (len(sentsplit1) - len(author_mapping)) new_paragraphs = sentsplit1 # tokenize input paragraphs input_paragraphs = [] for paragraph in new_paragraphs: inputs = tokenizer.encode_plus(paragraph, add_special_tokens=True, return_tensors='pt') input_paragraphs.append(inputs) # predict authors of new paragraphs predicted_authors = [] for i in range(1, len(input_paragraphs)): # concatenate previous paragraph with current paragraph inputs = input_paragraphs[i].copy() inputs['input_ids'] = torch.cat([inputs['input_ids'], input_paragraphs[i]['input_ids']], dim=1) inputs['token_type_ids'] = torch.cat([inputs['token_type_ids'], input_paragraphs[i]['token_type_ids']], dim=1) inputs['attention_mask'] = torch.cat([inputs['attention_mask'], input_paragraphs[i]['attention_mask']], dim=1) # predict author using BERT outputs = model(**inputs) probabilities = torch.softmax(outputs.logits, dim=1)[0].tolist() # choose author based on maximum probability predicted_author = author_mapping[i] # default to known author max_prob = probabilities[author_mapping[i]-1] # probability of known author for j in range(1,int((len(authors1))/2)): if j != author_mapping[i] and probabilities[j] > max_prob: predicted_author = j+1 max_prob = probabilities[j] predicted_authors.append(predicted_author) # add first author to predicted author list predicted_authors.insert(0, 1) print("Predicted authors:", predicted_authors) print(authors1) f1 = f1_score(authors1, predicted_authors, average='weighted') print(f"F1 score: {f1:.2f}") accuracy = accuracy_score(authors1, predicted_authors) writer.writerow([filename,multiple,multiauthor,cleaned_list,predicted_changes1,authors1,predicted_authors ]) f1_writer.writerow([filename,multiauthorf1,changesf1,changesac,f1,accuracy ]) counter+=1
