diff --git a/Fuzzy_attrib/functions.py b/Fuzzy_attrib/functions.py new file mode 100644 index 0000000000000000000000000000000000000000..0616c3dfdd002f28f5595197aa6ac5baacf217b8 --- /dev/null +++ b/Fuzzy_attrib/functions.py @@ -0,0 +1,1195 @@ +import glob +import numpy as np +import pandas as pd +import re +import os +import codecs +from PyPDF2 import PdfReader +import PyPDF2 +import glob +import shutil +from rapidfuzz import process, fuzz,distance +from tqdm import tqdm +import io,sys +import builtins +import logging +from collections import Counter +import json +import rapidfuzz +from collections import Counter +#import Compare_texts + + +logger = logging.getLogger('ErrorLogger') +# Configure logger to write to a file... + +def my_handler(type, value, tb): + logger.exception("Uncaught exception: {0}".format(str(value))) + +# Install exception handler +sys.excepthook = my_handler + +def print(*args, **kwargs): + """ + Une version personnalisée de la fonction print qui écrit également la sortie dans un fichier de log. + """ + log_file = open("../logs.txt","a",encoding="utf-8") + sep = kwargs.get("sep", " ") + end = kwargs.get("end", "\n") + file = kwargs.get("file",log_file) + print_str = sep.join([str(arg) for arg in args]) + end + file.write(print_str) + file.flush() + builtins.print(*args,**kwargs) + + + +def identifier_document_type(plf_a_trier_location): + """ + Identifie le type de document en tentant de lire une liasse soit de l'Assemblée Nationale (AN), + soit du Sénat, en utilisant leur format spécifique de lecture. + + Args: + plf_a_trier_location (str): Le chemin d'accès au document à identifier. + + Returns: + tuple: Contient le dictionnaire des données lues et le type de document identifié ("AN", "SENAT", ou "null" si non identifié). + """ + doc_type = "null" + d = None + + # Essayer de lire comme une liasse de l'Assemblée Nationale + try: + d = read_liasse(plf_a_trier_location) + print("Le document est une liasse de l'Assemblée Nationale.") + doc_type = "AN" + except Exception as e: + print("Le document n'est pas une liasse de l'Assemblée Nationale.") + print(e) + + # Si échoué, essayer de lire comme une liasse du Sénat + try: + d = read_liasse_senat(plf_a_trier_location) + print("Le document est une liasse du Sénat.") + doc_type = "SENAT" + except Exception as e: + print("Le document n'est pas une liasse du Sénat.") + print(e) + + return d, doc_type + + +def get_attrib_score(attributed,nattributed): + """ + Affiche le nombre d'amendements déjà triés, le nombre d'amendements à trier, + et calcule le score d'attribution. + + Cette fonction calcule et affiche le score d'attribution basé sur le ratio entre + le nombre d'amendements déjà attribués et le total d'amendements (attribués et non attribués). + + Args: + attributed (DataFrame): DataFrame contenant les amendements déjà attribués. + nattributed (DataFrame): DataFrame contenant les amendements non attribués. + + Affiche le nombre d'amendements dans chaque catégorie et le score d'attribution. + """ + print(str(np.shape(attributed)[0]) + " amendements déjà triés") + print(str(np.shape(nattributed)[0])+" amendements à trier") + print("score d'attribution :"+ str(np.shape(attributed)[0]/(np.shape(nattributed)[0]+np.shape(attributed)[0]))) + +def remplacement(match): + """ + Remplace les espaces dans une chaîne capturée par une expression régulière. + Args: + texte (str): La chaîne de caractères à filtrer. + + Returns: + str: La chaîne résultante ne contenant que des chiffres. + """ + return match.group(1) + re.sub(r'\s+', '', match.group(2)) + +def garder_seulement_chiffres(texte): + """ + Supprime tous les caractères non numériques d'une chaîne de caractères. + """ + return re.sub(r'\D', '', texte) + + +def uniformiser_numero(texte): + """ + Uniformise le format des numéros dans une chaîne de caractères issue de l'extraction des liasses AN ou Senat. Le but du jeu est de toujours finir avec un amendement référencé de la sorte: N° I-numero + + Cette fonction effectue plusieurs remplacements et nettoyages pour standardiser + le format des numéros mentionnés dans le texte. Les étapes incluent la conversion + des abréviations "(N°" en "(Numéro", la standardisation du préfixe des numéros, + et l'unification des formats de numérotation pour les suivis de certains préfixes spécifiques. + + Args: + texte (str): La chaîne de caractères contenant les numéros à uniformiser. + + Returns: + str: La chaîne de caractères avec les numéros uniformisés. + + Les modifications effectuées sont : + - Remplacement de "(N°" par "(Numéro". + - Ajout d'un préfixe "I-" devant les numéros qui sont précédés de "N° " et + éventuellement de "CF", suivi directement par des chiffres. + - Uniformisation du format de numérotation pour une liste spécifiée de préfixes de comissions + (par exemple, "AS", "AC", "CF", etc.) en les remplaçant tous par "I-" suivi des chiffres. + - Remplacement des préfixes "II-" et "III-" par "I-" pour les numéros précédés de "N° ". + """ + # Remplacer les occurrences de "(N°" par "(Numéro" + texte = re.sub(r'\(N°', '(Numéro', texte) + + texte = re.sub(r'N° \s*(CF)?(\d+)', r'N° I-\1\2', texte) + + # Uniformiser le format du numéro pour "N° ..." + texte = re.sub(r'N°\s*(I+-CF|I+|CF|I-CF|AS|AC|CE|CS|I-AS|I-AC|I-CE|I-CS)?(\d+)', r'N° I-\2', texte) + + # Remplacer ensuite les préfixes "II-" et "III-" par "I-" + texte_uniforme = re.sub(r'N° I{2,3}-', 'N° I-', texte) + + return texte_uniforme + +def read_liasse(liste_doc): + """ + Lit une série de documents PDF listés dans un répertoire et extrait le texte associé à chaque amendement. + + Cette fonction parcourt les fichiers PDF spécifiés dans le chemin fourni par `liste_doc`, extrait le texte de chaque page, + et assemble le texte de chaque amendement. Chaque amendement est identifié par un numéro unique extrait du nom du fichier + et potentiellement par des indicateurs dans le texte. Le texte extrait est ensuite stocké dans un dictionnaire avec le numéro + d'amendement comme clé. + + Args: + liste_doc (str): Le chemin vers le répertoire contenant les fichiers PDF à lire. Le chemin doit se terminer par + le nom du répertoire, sans wildcard pour les fichiers (la fonction ajoute "\\*.pdf" pour trouver + tous les PDF dans le répertoire). + + Returns: + dict: Un dictionnaire où les clés sont les numéros des amendements (int) et les valeurs sont le texte complet (str) + de chaque amendement extrait des fichiers PDF. + + Le processus inclut l'uniformisation des numéros d'amendement dans le texte pour faciliter leur identification, ainsi que + la gestion des cas où le texte d'un amendement s'étend sur plusieurs pages. La fonction imprime également des informations + de suivi pendant son exécution, telles que le nom du document actuellement traité et le nombre d'amendements traités. + """ + liste_doc = glob.glob(liste_doc+"\\*.pdf") + + k = [int(docname.split("\\")[-1].split(" ")[4]) for docname in liste_doc] + liste_amendements, num_init, d = [], 1, {} + + for j in list(np.argsort(k)): + docname = liste_doc[j] + print(f"Traitement du document : {docname}") + reader = PyPDF2.PdfReader(docname) + num_init = int(docname.split("\\")[-1].split(" ")[4]) + print(f"Numéro initial de l'amendement : {num_init}") + print(f"Nombre actuel d'amendements traités : {len(liste_amendements)}") + text = "" + # Itérer sur chaque page du document PDF avec une barre de progression + for i in tqdm(range(len(reader.pages))): + # Pré-traitement du texte de la page suivante pour détecter les changements d'amendements + if i != len(reader.pages) - 1: # Si ce n'est pas la dernière page + next_page_text = reader.pages[i + 1].extract_text().replace("N o", "N°") + # Appliquer l'uniformisation et le remplacement des numéros sur le texte de la page suivante + correction = uniformiser_numero(re.sub(r'(N° I-.*?)((?:\s+\d+)+)', remplacement, next_page_text)) + + # Extraire et uniformiser le texte de la page actuelle + current_page_text = uniformiser_numero(reader.pages[i].extract_text()) + text += current_page_text # Concaténer le texte à la variable accumulatrice 'text' + + # Vérifier si le numéro d'amendement actuel ne figure pas dans le texte de la page suivante + # Indiquant la fin de l'amendement actuel et le début d'un nouveau + if i != len(reader.pages) - 1 and f"N° I-{num_init}" not in correction: + d[num_init] = text # Assigner le texte accumulé à l'amendement actuel + text = "" # Réinitialiser le texte pour le prochain amendement + # Mise à jour de num_init pour le prochain amendement basé sur le texte de correction + num_init = int(garder_seulement_chiffres(re.sub(r'\D', '', correction[correction.find("N° I-")+5:correction.find("N° I-")+9]))) + elif i == len(reader.pages) - 1: # Pour la dernière page, assigner le texte restant + d[num_init] = text + num_init += 1 # Préparer num_init pour un potentiel usage futur + return(d) + +def read_liasse_senat(liste_doc_path): + """ + Lit et traite les documents PDF de liasses du Sénat situés dans un répertoire spécifié, + en extrayant et nettoyant le texte de chaque page, et en regroupant ce texte par numéro d'amendement. + + La fonction parcourt tous les fichiers PDF dans le répertoire donné, nettoie le texte extrait pour uniformiser + certaines chaînes de caractères, et regroupe le texte par amendement en se basant sur la détection de numéros + d'amendement uniques dans le texte. + + Args: + liste_doc_path (str): Le chemin du répertoire contenant les documents PDF à lire. + + Returns: + dict: Un dictionnaire où chaque clé est un numéro d'amendement (int) et chaque valeur est le texte (str) + associé à cet amendement, nettoyé et concaténé à partir des pages du document PDF. + """ + # Définir les chemins vers les documents PDF + liste_doc = glob.glob(f"{liste_doc_path}\\*.pdf") + + # Fonction interne pour nettoyer et préparer le texte extrait + def clean_and_prepare_text(text): + replacements = { + "II-": "I-", "N° FINC.": "N° I-", "n° FINC.": "N° I-", + "A M E N D E M E N T": "AMENDEMENT", "_________________": "------", + "Suite amdt N° I-": "", "Suite amd N° I-": "", "OBJET": "Objet" + } + for old, new in replacements.items(): + text = text.replace(old, new) + return text + + # Fonction interne pour extraire un numéro d'amendement à partir du texte + def extract_amendment_number(text): + match = re.search(r'N° I-(\d+)', text) + return int(match.group(1)) if match else None + + d = {} + for docname in tqdm(liste_doc): + print(f"Traitement du document : {docname}") + reader = PyPDF2.PdfReader(docname) + text_accumulator = "" + num_init = None + + for i, page in enumerate(reader.pages): + current_text = clean_and_prepare_text(page.extract_text()) + + # Mise à jour du numéro d'amendement si un nouveau est trouvé dans le texte actuel + if "N° I-" in current_text and (num_init is None or i != len(reader.pages) - 1): + new_num_init = extract_amendment_number(current_text) + if new_num_init and new_num_init != num_init: + if num_init is not None: + d[num_init] = text_accumulator + text_accumulator = "" + num_init = new_num_init + + text_accumulator += current_text + + # Enregistrer le texte accumulé si c'est la dernière page + if i == len(reader.pages) - 1 and num_init is not None: + d[num_init] = text_accumulator + + return d + +def get_retires_and_inexistants(nattributed, moulinette2, d): + """ + Identifie les amendements inexistants ou retirés avant publication ou déclarés irrecevables. + + Args: + nattributed (DataFrame): DataFrame contenant les numéros d'amendements non attribués. + moulinette2 (DataFrame): DataFrame pour mettre à jour le statut des amendements. + d (dict): Dictionnaire contenant le corps des amendements avec leur numéro comme clé. + + Returns: + DataFrame: Le DataFrame moulinette2 mis à jour avec les statuts "inexistant" ou "Retiré" pour les amendements concernés. + """ + inexistant_atm = [] # Liste pour stocker les amendements inexistants + retire_av_pub = [] # Liste pour stocker les amendements retirés avant publication + + # Parcourir la liste des numéros d'amendements non attribués + for i in list(nattributed["N ° amdt"]): + if i != 0: # Ignorer l'amendement numéro 0 s'il est présent + try: + # Vérifier si l'amendement a été retiré avant publication ou déclaré irrecevable + if (fuzz.partial_ratio("Retiré avant publication", d[i].replace("\n", "")) > 80 or + fuzz.partial_ratio("Cet amendement a été retiré avant séance", d[i].replace("\n", "")) > 80 or + fuzz.partial_ratio("Cet amendement a été déclaré irrecevable après diffusion en application de l'article 98 du règlement de l'Assemblée nationale.", d[i].replace("\n", "")) > 80): + retire_av_pub.append(i) + else: + try: + # Tentative d'extraction d'une section spécifique de l'amendement + cartouche = d[i].replace("\n", " ").split("----")[0] + except KeyError: + # L'amendement est considéré comme inexistant s'il n'est pas trouvé + print(f"Amendement {i} non trouvé.") + inexistant_atm.append(i) + except KeyError: + # L'amendement est marqué inexistant s'il déclenche une KeyError lors de l'accès + inexistant_atm.append(i) + + # Mettre à jour le DataFrame pour les amendements inexistants + for i in inexistant_atm: + moulinette2.loc[i-1, "Attribution finale"] = "inexistant" + print(f"amendement {i} est inexistant") + + # Mettre à jour le DataFrame pour les amendements retirés + for i in retire_av_pub: + moulinette2.loc[i-1, "Attribution finale"] = "Retiré" + print(f"amendement {i} est Retiré") + + return moulinette2 + + +def get_retires_and_inexistants_senat(nattributed, moulinette2, d): + """ + Identifie les amendements inexistants ou retirés avant publication au Sénat. + + Args: + nattributed (DataFrame): DataFrame contenant les numéros d'amendements non attribués. + moulinette2 (DataFrame): DataFrame où mettre à jour le statut des amendements. + d (dict): Dictionnaire contenant le corps des amendements avec leurs numéros comme clés. + + Returns: + DataFrame: Le DataFrame moulinette2 mis à jour avec les statuts "inexistant" ou "Retiré" pour les amendements concernés. + """ + # Initialisation des listes pour stocker les amendements inexistants et retirés avant publication + inexistant_atm = [] + retire_av_pub = [] + + # Parcours des numéros d'amendements non attribués + for i in list(nattributed["N ° amdt"]): + if i != 0: # On ignore l'amendement numéro 0 s'il existe + if i not in list(d.keys()): + # Si le numéro d'amendement n'existe pas dans le dictionnaire, il est considéré comme inexistant + inexistant_atm.append(i) + elif (fuzz.partial_ratio("Retiré avant publication", d[i].replace("\n", "")) > 80 or + fuzz.partial_ratio("Cet amendement a été retiré avant séance", d[i].replace("\n", "")) > 80): + # Si le corps de l'amendement contient des mentions indiquant un retrait avant publication avec un certain seuil de similarité, il est considéré comme retiré + retire_av_pub.append(i) + + # Mise à jour du DataFrame moulinette2 pour marquer les amendements inexistants + for i in inexistant_atm: + moulinette2.loc[i-1, "Attribution finale"] = "inexistant" + print(f"amendement {i} est inexistant") + + # Mise à jour du DataFrame moulinette2 pour marquer les amendements retirés avant publication + for i in retire_av_pub: + moulinette2.loc[i-1, "Attribution finale"] = "Retiré" + print(f"amendement {i} est Retiré") + + return moulinette2 + +def get_corps_amendements(amendments): + """ + Extrait différentes parties du corps des amendements. + + Args: + amendments (dict): Dictionnaire contenant les amendements, où chaque clé est un identifiant unique. + + Returns: + tuple: Trois dictionnaires contenant différentes extractions du corps des amendements. + """ + + # Initialisation des dictionnaires pour stocker les différentes extractions + short_body_extracts = dict() # Extraits courts du corps + long_body_extracts = dict() # Extraits longs du corps + full_bodies = dict() # Corps complets des amendements + + # Itération sur chaque amendement en utilisant tqdm pour une barre de progression + for amendment_id in tqdm(list(amendments.keys())): + # On ignore l'identifiant 0 s'il est présent + if amendment_id != 0: + try: + # Tentative d'extraction d'un extrait court du corps de l'amendement + split_point = amendments[amendment_id].split("------")[1].split("\n")[2] + short_extract = "".join(amendments[amendment_id].split(split_point)[1].split("\n")[1:3]) + except: + # En cas d'échec, utiliser le corps complet comme extrait court + short_extract = amendments[amendment_id] + + try: + # Tentative d'extraction d'un extrait long en cherchant "l'article suivant:" + long_extract = amendments[amendment_id].split("l'article suivant:")[1].split("perte de recettes")[0] + except: + try: + # Si la première tentative échoue, essayer une extraction basée sur "------" + long_extract = amendments[amendment_id].split("------")[1].split("perte de recettes")[0] + except: + # En cas d'échec total, utiliser le corps complet + long_extract = amendments[amendment_id] + + # Assignation des extraits et du corps complet aux dictionnaires appropriés + short_body_extracts[amendment_id] = short_extract + long_body_extracts[amendment_id] = long_extract + full_bodies[amendment_id] = amendments[amendment_id] + + # Retourner les trois dictionnaires contenant les extractions + return short_body_extracts, long_body_extracts, full_bodies + + +def get_corps_amendements_senat(d): + """ + Extrait des sections spécifiques du corps des amendements pour le Sénat. + + Args: + amendments (dict): Dictionnaire contenant les amendements, où chaque clé est un identifiant unique d'amendement. + + Returns: + tuple: Trois dictionnaires contenant respectivement des extraits courts, des extraits alternatifs courts, + et les corps complets de chaque amendement. + """ + ###On récupère le corps des amendements + corps_dict = dict() + corps_dict2 = dict() + corps_dict3 = dict() + for i in tqdm(list(d.keys())): + if i != 0: + try: + corps_small = d[i].split(d[i].split("AMENDEMENT")[1].split("\n")[3])[1].split("La perte de recettes")[0] + if "Objet" in corps_small: + corps_small = d[i].split(d[i].split("AMENDEMENT")[1].split("\n")[3])[1].split("Objet")[0] + except: + try: + corps_small = d[i].split(d[i].split("AMENDEMENT")[1].split("\n")[3])[1].split("Objet")[0] + except: + print(i) + if (corps_small == '\n'): + corps_small = d[i].split("------")[1].split("La perte de recettes")[0].split("Objet")[0].replace("\n"," ") + corps_dict2[i] = corps_small + corps_dict[i] = corps_small + corps_dict3[i] = d[i] + return(corps_dict,corps_dict2,corps_dict3) + + +def mapping_PLF_function(corps_dict2): + """ + Crée un dictionnaire de mappage pour les clés d'un dictionnaire donné, en associant chaque clé à un indice numérique. + + Exemple d'utilisation: + >>> corps_dict2 = {"Article 1": "Texte de l'article 1", "Article 2": "Texte de l'article 2"} + >>> mapping_PLF = mapping_PLF_function(corps_dict2) + >>> print(mapping_PLF) + {0: 'Article 1', 1: 'Article 2'} + + Cette fonction est particulièrement utile pour créer des indices numériques facilitant l'itération, le référencement, + ou l'accès direct aux éléments d'un dictionnaire basé sur des identifiants textuels ou autres non numériques. + """ + # Utilisation de la compréhension de dictionnaire pour créer mapping_PLF + mapping_PLF = {i: key for i, key in enumerate(corps_dict2.keys())} + return mapping_PLF + + + +def write_PLF(corps_dict2): + """ + Écrit le contenu d'un dictionnaire représentant les corps d'articles de PLF dans un fichier JSON. + + Args: + corps_dict2 (dict): Le dictionnaire contenant les corps d'articles de PLF à écrire dans un fichier JSON. + Les clés du dictionnaire sont les numéros et les valeurs des chaînes de caractères + représentant le contenu textuel de chaque article. + + Aucune valeur n'est retournée. La fonction crée ou écrase le fichier "Corps_Articles_PLF.json" dans le répertoire + """ + # Utilisation du contexte with pour ouvrir le fichier + with open("../Corps_Articles_PLF.json", "w") as f: + # Écriture directe du dictionnaire dans le fichier sous forme de JSON + json.dump(corps_dict2, f) + +def compare_same_PLF(corps_dict2,mapping_PLF,nattributed,attributed,moulinette2,ressemblance): + """ + Compare les amendements d'un même PLF pour identifier et traiter les cas de similitudes basées sur un score de ressemblance. + + Cette fonction examine les similitudes entre les textes des amendements et, en fonction de leur score de ressemblance, + effectue des mises à jour d'attribution si nécessaire. Elle prend en compte uniquement les amendements non attribués + et compare chaque amendement avec d'autres pour identifier les possibles doublons ou similitudes. + + Args: + corps_dict2 (dict): Dictionnaire contenant le texte des amendements. + mapping_PLF (list): Liste des numéros d'amendements. + nattributed (DataFrame): DataFrame des amendements non attribués. + attributed (DataFrame): DataFrame des amendements déjà attribués. + moulinette2 (DataFrame): DataFrame global pour mise à jour des attributions. + ressemblance (int): Seuil de score pour considérer deux amendements comme similaires. + + Returns: + DataFrame: Le DataFrame moulinette2 mis à jour avec les nouvelles attributions effectuées. + """ + # Fonction interne pour vérifier et mettre à jour l'attribution basée sur le score + def check_and_update_attribution(i, j, score): + # Condition pour vérifier si l'amendement actuel doit être attribué ou signalé comme trop court + if len(corps_dict2[mapping_PLF[i]].split("\n")) < 5 and score_list[i][j] < (ressemblance + 10 if ressemblance < 90 else 100): + print(f"ATTENTION PAS ATTRIBUE amendement numero {mapping_PLF[i]} est similaire à {[mapping_PLF[duplicates_list[i][j]], score_list[i][j]]} mais trop court pour le score") + elif moulinette2.loc[mapping_PLF[duplicates_list[i][j]]-1, "Attribution finale"] not in ["NF", "Retiré"]: + print(f"amendement numero {mapping_PLF[i]} est similaire à {[mapping_PLF[duplicates_list[i][j]], score_list[i][j]]} on attribue à {moulinette2.loc[mapping_PLF[duplicates_list[i][j]]-1,'Attribution finale']}") + moulinette2.loc[mapping_PLF[i]-1, "Attribution finale"] = moulinette2.loc[mapping_PLF[duplicates_list[i][j]]-1, "Attribution finale"] + return score_list[i][j] + return score + + elements = [i.replace("\n","").replace(" "," ") for i in list(corps_dict2.values())] + sa = process.cdist(elements, elements, score_cutoff=ressemblance,workers=-1) + + duplicates_list = [] + score_list = [] + + for distances in sa: + # Get indices of duplicates + indices = np.argwhere(~np.isin(distances, [0, 0.0])).flatten() + duplicates_list.append(indices.tolist()) + # Extraction des scores associés à chaque duplicata identifié + scores = [distances[i] for i in indices] + score_list.append(scores) + for l in range(2): + print(f"{l+1}eme tour") + for i in range(len(score_list)): + if mapping_PLF[i] in nattributed["N ° amdt"].tolist(): + highest_score = 0 + if len(duplicates_list[i]) > 1: + for j in range(len(duplicates_list[i])): + if mapping_PLF[duplicates_list[i][j]] != mapping_PLF[i] and mapping_PLF[duplicates_list[i][j]] in attributed["N ° amdt"].tolist(): + try: + if score_list[i][j] > highest_score: + highest_score = check_and_update_attribution(i, j, highest_score) + except Exception as e: + print(f"Error processing amendment {mapping_PLF[i]}: {e}") + continue + + return moulinette2 + +def compare_same_PLF_senat(corps_dict2,mapping_PLF,nattributed,attributed,moulinette2,ressemblance): + """ + Compare les amendements d'un même PLF du Sénat pour identifier et traiter les cas de similitudes + basées sur un score de ressemblance. Met à jour les attributions d'amendements en fonction des similitudes trouvées. + + Args: + corps_dict2 (dict): Dictionnaire contenant le texte des amendements. + mapping_PLF (list): Liste des numéros d'amendements. + nattributed (DataFrame): DataFrame des amendements non attribués. + attributed (DataFrame): DataFrame des amendements déjà attribués. + moulinette2 (DataFrame): DataFrame global pour la mise à jour des attributions. + ressemblance (int): Seuil de score pour considérer deux amendements comme similaires. + + Returns: + DataFrame: Le DataFrame moulinette2 mis à jour avec les nouvelles attributions réalisées. + """ + elements = [i.replace("\n","").replace(" "," ") for i in list(corps_dict2.values())] + sa = process.cdist(elements, elements, score_cutoff=ressemblance,workers=-1) + + duplicates_list = [] + score_list = [] + + for distances in sa: + # Get indices of duplicates + indices = np.argwhere(~np.isin(distances, [0, 0.0])).flatten() + # Get names from indices + names = [x for x in indices]#indices#names = list(map(elements.__getitem__, indices)) + duplicates_list.append(names) + + scores = [distances[x] for x in indices] + score_list.append(scores) + + # Boucle principale pour comparer et potentiellement mettre à jour les attributions + for l in range(2): + print(f"{l+1}eme tour") + for i, scores in enumerate(score_list): + if mapping_PLF[i] in nattributed["N ° amdt"].values: + highest_score = 0 + for j, score in enumerate(scores): + if mapping_PLF[duplicates_list[i][j]] != mapping_PLF[i] and mapping_PLF[duplicates_list[i][j]] in attributed["N ° amdt"].values: + condition = len(corps_dict2[mapping_PLF[i]].split("\n")) < 5 and score < (ressemblance + 10 if ressemblance < 90 else 100) + # Logique spécifique pour les amendements similaires mais courts + if condition: + print(f"ATTENTION PAS ATTRIBUE amendement numero {mapping_PLF[i]} est similaire à {mapping_PLF[duplicates_list[i][j]], score} mais trop court pour le score") + elif "Supprimer cet article" not in corps_dict2[mapping_PLF[i]]: + # Mise à jour de l'attribution si les conditions sont remplies + print(f"amendement numero {mapping_PLF[i]} est similaire à {mapping_PLF[duplicates_list[i][j]], score} on attribue à {moulinette2.loc[mapping_PLF[duplicates_list[i][j]]-1, 'Attribution finale']}") + moulinette2.loc[mapping_PLF[i]-1, "Attribution finale"] = moulinette2.loc[mapping_PLF[duplicates_list[i][j]]-1, "Attribution finale"] + highest_score = score + return moulinette2 + + +def which_PLF(n,d): + """ + Identifie le PLF le plus proche et antérieur pour un numéro d'amendement donné. + + Parcourt un dictionnaire représentant les PLFs avec leurs indices de début correspondants et détermine + le PLF dont l'indice est le plus proche mais inférieur au numéro d'amendement spécifié. Cette fonction + est utile pour attribuer un amendement à un PLF spécifique basé sur son numéro d'ordre. + + Args: + n (int): Le numéro d'amendement pour lequel trouver le PLF correspondant. + d (dict): Un dictionnaire où chaque clé est le nom d'un PLF et chaque valeur est l'indice de début + de ce PLF dans une liste ou un DataFrame d'amendements. + + Returns: + str: Le nom du PLF le plus proche et antérieur au numéro d'amendement donné. Si aucun PLF antérieur + n'est trouvé (c'est-à -dire, tous les PLFs ont un indice de début supérieur au numéro d'amendement), + la fonction retourne `None`. + + Exemple: + >>> d = {"PLF 2018": 100, "PLF 2019": 200, "PLF 2020": 300} + >>> which_PLF(250, d) + 'PLF 2019' + >>> which_PLF(99, d) + None + """ + fdiff_min = float("inf") + cle_min = None + for cle,valeur in d.items(): + diff = abs(valeur-n) + if valeur<n and diff < fdiff_min: + fdiff_min = diff + cle_min = cle + return(cle_min) + +def most_common(lst): + """ + Trouve et retourne l'élément le plus commun dans une liste. + """ + return max(set(lst), key=lst.count) + + +def get_old_PLFs(liste_doc): + """ + Charge une série de fichiers Excel qui sont les extractions SIGNALE des projets de loi passés, et les concatène en un seul DataFrame. + Construit également un dictionnaire mappant le nom de chaque fichier PLF à son indice de début dans le DataFrame concaténé, + permettant de retracer l'historique de l'origine de chaque data point. + + Args: + liste_doc (list): Liste des chemins de fichiers Excel à charger. Le premier fichier dans la liste est considéré comme le point de départ + de l'historique, avec les fichiers suivants ajoutés séquentiellement. + + Returns: + tuple: Contient deux éléments: + - df_final (DataFrame): Un DataFrame pandas contenant toutes les données chargées à partir des fichiers Excel listés, + concaténées en une seule structure de données. + - mapping_historique (dict): Un dictionnaire où chaque clé est le nom de base d'un fichier PLF (sans le chemin ni l'extension) + et chaque valeur est l'indice de la première ligne de ce fichier dans le DataFrame concaténé, + permettant d'identifier l'origine de chaque enregistrement. + + Cette fonction est utile pour combiner des données de plusieurs périodes ou versions de PLFs en une seule structure pour une analyse + intégrée ou pour maintenir un historique des changements sur le temps. Elle automatise le processus de chargement, de concaténation, + et de mappage de l'origine des données pour faciliter le suivi et l'analyse. + """ + # Charger le premier fichier Excel + dfs = [pd.read_excel(liste_doc[0])] + l_historique = [0] # Initialiser l'historique avec 0 pour le premier fichier + + # Charger les fichiers restants et construire l'historique en une seule étape + for fichier in liste_doc[1:]: + nouveau_df = pd.read_excel(fichier) + l_historique.append(l_historique[-1] + dfs[-1].shape[0]) + dfs.append(nouveau_df) + + # Concaténer tous les DataFrames en une seule opération + df_final = pd.concat(dfs, ignore_index=True) + + # Utiliser une compréhension de dictionnaire pour construire mapping_historique + mapping_historique = {fichier.split("\\")[-1].split("_Signale.xlsx")[0]: taille + for fichier, taille in zip(liste_doc, l_historique)} + + return df_final, mapping_historique + + + +def compare_old_PLF(df,corps_dict2,mapping_PLF,nattributed,attributed,moulinette2,ressemblance,mapping_historique): + """ + Compare les amendements d'anciens PLF avec des amendements actuels de l'AN pour identifier des similitudes basées sur un score de ressemblance. + + Args: + df (DataFrame): DataFrame contenant les corps d'amendements actuels. + corps_dict2 (dict): Dictionnaire contenant les corps d'amendements d'anciens PLF. + mapping_PLF (list): Liste des identifiants d'amendements d'anciens PLF. + nattributed (DataFrame): DataFrame des amendements non attribués. + attributed (DataFrame): DataFrame des amendements déjà attribués. + moulinette2 (DataFrame): DataFrame pour la mise à jour des attributions. + ressemblance (float): Seuil de ressemblance pour considérer deux amendements comme similaires. + mapping_historique (dict): Dictionnaire pour le mapping historique des amendements. + + Returns: + DataFrame: Le DataFrame moulinette2 mis à jour avec les attributions réalisées. + """ + + elements = [str(i).replace("\n","").replace(" "," ").split("pertederecette")[0].upper() for i in list(df["Corps amdt"])] + elements_PLF = [i.replace("\n","").replace(" "," ").upper() for i in list(corps_dict2.values())] + sa = process.cdist(elements_PLF, elements, score_cutoff=ressemblance,workers=-1)#,scorer = fuzz.token_sort_ratio) + + duplicates_list = [] + score_list = [] + + + for distances in sa: + # Get indices of duplicates + indices = np.argwhere(~np.isin(distances, [0, 0.0])).flatten() + # Get names from indices + names = [x for x in indices]#indices#names = list(map(elements.__getitem__, indices)) + duplicates_list.append(names) + + scores = [distances[x] for x in indices] + score_list.append(scores) + + for i in range(len(score_list)): + if (mapping_PLF[i] in list(nattributed["N ° amdt"])): + if (len(duplicates_list[i]) > 0): + l = [] + if (len(corps_dict2[mapping_PLF[i]].split("\n")) < 5 and list(np.argsort(score_list[i])[:][::-1])[0] < (ressemblance*1.15 if ressemblance < 90 else 100)): + print(" ATTENTION PAS ATTRIBUE amendement numero "+str(mapping_PLF[i])+" est similaire à "+str([[df["Num amdt"].iloc[duplicates_list[i][j]],which_PLF(duplicates_list[i][j],mapping_historique),score_list[i][j]] for j in list(np.argsort(score_list[i])[:][::-1])])) + + else: + for j in list(np.argsort(score_list[i])[:][::-1]): + try: + text = str(df["Objet amdt"].iloc[duplicates_list[i][j]]) + text = re.sub(r'MEFSIN-DLF-|MEFSIN/DLF-|MESFIN-DLF-|MESFIN--DLF-|MEFSIN--DLF-|Economie-|MESFIN-|MEFSIN-|DGFiP-|DGFIP-|DLF-', '', text) + l.append(re.split(r'Intervenants : |[-–\\/]|\bIdentique\b', text)[1]) + except: + continue + print("amendement numero "+str(mapping_PLF[i])+" est similaire à "+str([[df["Num amdt"].iloc[duplicates_list[i][j]],which_PLF(duplicates_list[i][j],mapping_historique),score_list[i][j]] for j in list(np.argsort(score_list[i])[:][::-1])])) + print(l) + try: + if (most_common([i for i in l if i != ""]) != "Retiré" and most_common([i for i in l if i != ""]) != "inexistant" and most_common([i for i in l if i != ""]) != "NF"): + moulinette2.loc[mapping_PLF[i]-1,"Attribution finale"] = most_common([i for i in l if i != ""]) + print(most_common([i for i in l if i != ""])) + except: + continue + print("#"*20) + return(moulinette2) + +def compare_old_PLF_senat(df,corps_dict2,mapping_PLF,nattributed,attributed,moulinette2,ressemblance,mapping_historique): + """ + Compare les amendements d'anciens PLF avec des amendements actuels du Sénat pour identifier des similitudes basées sur un score de ressemblance. + + Args: + df (DataFrame): DataFrame contenant les corps d'amendements actuels. + corps_dict2 (dict): Dictionnaire contenant les corps d'amendements d'anciens PLF. + mapping_PLF (list): Liste des identifiants d'amendements d'anciens PLF. + nattributed (DataFrame): DataFrame des amendements non attribués. + attributed (DataFrame): DataFrame des amendements déjà attribués. + moulinette2 (DataFrame): DataFrame pour la mise à jour des attributions. + ressemblance (float): Seuil de ressemblance pour considérer deux amendements comme similaires. + mapping_historique (dict): Dictionnaire pour le mapping historique des amendements. + + Returns: + DataFrame: Le DataFrame moulinette2 mis à jour avec les attributions réalisées. + """ + elements = [str(i).replace("\n","").replace(" "," ").split("pertederecette")[0].upper() for i in list(df["Corps amdt"])] + elements_PLF = [i.replace("\n","").replace(" "," ").upper() for i in list(corps_dict2.values())] + sa = process.cdist(elements_PLF, elements, score_cutoff=ressemblance,workers=-1)#,scorer = fuzz.token_sort_ratio) + + duplicates_list = [] + score_list = [] + + + for distances in sa: + # Get indices of duplicates + indices = np.argwhere(~np.isin(distances, [0, 0.0])).flatten() + # Get names from indices + names = [x for x in indices]#indices#names = list(map(elements.__getitem__, indices)) + duplicates_list.append(names) + + scores = [distances[x] for x in indices] + score_list.append(scores) + + for i in range(len(score_list)): + if (mapping_PLF[i] in list(nattributed["N ° amdt"])): + if (len(duplicates_list[i]) > 0): + l = [] + if (len(corps_dict2[mapping_PLF[i]].split("\n")) <= 5 and list(np.argsort(score_list[i])[:][::-1])[0] < (ressemblance*1.15 if ressemblance <= 90 else 100)): + print(" ATTENTION PAS ATTRIBUE amendement numero "+str(mapping_PLF[i])+" est similaire à "+str([[df["Num amdt"].iloc[duplicates_list[i][j]],which_PLF(duplicates_list[i][j],mapping_historique),score_list[i][j]] for j in list(np.argsort(score_list[i])[:][::-1])])) + + else: + for j in list(np.argsort(score_list[i])[:][::-1]): + try: + text = str(df["Objet amdt"].iloc[duplicates_list[i][j]]) + text = re.sub(r'MEFSIN-DLF-|MEFSIN/DLF-|MESFIN-DLF-|MESFIN--DLF-|MEFSIN--DLF-|Economie-|MESFIN-|MEFSIN-|DGFiP-|DGFIP-|DLF-', '', text) + l.append(re.split(r'Intervenants : |[-–\\/]|\bIdentique\b', text)[1]) + except: + continue + print("amendement numero "+str(mapping_PLF[i])+" est similaire à "+str([[df["Num amdt"].iloc[duplicates_list[i][j]],which_PLF(duplicates_list[i][j],mapping_historique),score_list[i][j]] for j in list(np.argsort(score_list[i])[:][::-1])])) + print(l) + try: + if ((most_common([i for i in l if i != ""]) != "Retiré" and most_common([i for i in l if i != ""]) != "inexistant") and most_common([i for i in l if i != ""]) != "inexistant"): + moulinette2.loc[mapping_PLF[i]-1,"Attribution finale"] = most_common([i for i in l if i != ""]) + print(most_common([i for i in l if i != ""])) + except: + continue + print("#"*20) + return(moulinette2) + + + +def get_dicts(moulinetteCGI, moulinettePLF, moulinetteCIBS, moulinetteCDD, moulinetteCodeExt, moulinetteCodeMots): + """ + Crée et retourne des dictionnaires à partir de DataFrames spécifiques, associant nomenclatures, articles, + et bureaux à leurs compétences respectives dans différents contextes (CGI, PLF, CIBS, CDD, codes externes). + Chaque dictionnaire renvoyé associe des articles (comme la nomenclature ou les mots-clés) à des valeurs + représentant des compétences métier par bureau. + Args: + moulinetteCGI (DataFrame): DataFrame contenant les colonnes "Nomenclature CGI" et "Compétence " pour le Code Général des Impôts. + moulinettePLF (DataFrame): DataFrame contenant les colonnes "Article PLF" et "Compétence" pour le Projet de Loi de Finances. + moulinetteCIBS (DataFrame): DataFrame contenant les colonnes "Nomenclature CIBS" et "Compétence" pour les Biens et Serives. + moulinetteCDD (DataFrame): DataFrame contenant les colonnes "Nomenclature CDD" et "Compétence " pour le Code des Douanes. + moulinetteCodeExt (DataFrame): DataFrame contenant les colonnes "Nom du code", "Article", et "Ministère/direction compétentes" pour les codes externes à bercy. + moulinetteCodeMots (DataFrame): DataFrame contenant les colonnes "Mots-clés" et "Ministère/direction compétents" pour une association basée sur des mots-clés de l'exposé sommaire. + Returns: + tuple: Contient six dictionnaires (CGI, PLF, CIBS, CDD, CodeExt, Mots) où chaque dictionnaire associe des éléments clés à des compétences bureau pour un type de code ou des mots clés. + """ + CGI = {str(nomenclature): str(competence) for nomenclature, competence in zip(moulinetteCGI["Nomenclature CGI"], moulinetteCGI["Compétence "])} + PLF = {str(article): str(competence) for article, competence in zip(moulinettePLF["Article PLF"], moulinettePLF["Compétence"])} + CIBS = {str(nomenclature): str(competence) for nomenclature, competence in zip(moulinetteCIBS["Nomenclature CIBS"], moulinetteCIBS["Compétence"])} + CDD = {str(nomenclature): str(competence) for nomenclature, competence in zip(moulinetteCDD["Nomenclature CDD"], moulinetteCDD["Compétence "])} + CodeExt = {str(nom): (str(article) + " " + str(direction)).replace("nan ", "") for nom, article, direction in zip(moulinetteCodeExt["Nom du code"], moulinetteCodeExt["Article"], moulinetteCodeExt["Ministère/direction compétentes"])} + + Mots = {str(mot): str(direction) for mot, direction in zip(moulinetteCodeMots["Mots-clés"], moulinetteCodeMots["Ministère/direction compétents"])} + + return CGI, PLF, CIBS, CDD, CodeExt, Mots + + +def find_longest_substring(string, dictionary): + """ + Trouve la clé la plus longue dans un dictionnaire qui est une sous-chaîne de la chaîne donnée. + Args: + string (str): La chaîne de caractères dans laquelle rechercher les sous-chaînes. + dictionary (dict): Un dictionnaire dont les clés seront recherchées comme sous-chaînes dans `string`. + Returns: + str: La clé la plus longue du dictionnaire qui est une sous-chaîne de `string`. Si aucune clé correspondante + n'est trouvée, retourne une chaîne vide. + """ + normalized_string = string.replace(" ", "").upper() + + # Utilisez une compréhension de liste pour filtrer et trier les clés en une étape. + # Cela crée une liste de clés qui sont dans la chaîne, triées par longueur décroissante. + matching_keys = sorted( + (key for key in dictionary if key.replace(" ", "").upper() in normalized_string), + key=len, + reverse=True + ) + + # Retournez la première clé (la plus longue) s'il y a des correspondances, sinon retournez une chaîne vide. + return matching_keys[0] if matching_keys else "" + + +def get_place(d, nattributed): + """ + Extrait et associe l'information de position des amendements dans le texte à leurs numéros correspondants dans un dictionnaire. + + Parcourt un DataFrame contenant des numéros d'amendements et extrait leur "place" spécifique à partir d'un dictionnaire + donné. La "place" fait référence à la position de l'amendement dans le texte, identifiant si l'amendement vise un article + existant ou introduit de nouvelles dispositions non couvertes par les articles existants. + + Args: + d (dict): Dictionnaire avec les numéros d'amendements comme clés et le texte complet de l'amendement comme valeurs. + nattributed (DataFrame): DataFrame contenant au moins une colonne "N ° amdt" avec les numéros d'amendements. + + Returns: + dict: Un dictionnaire où chaque clé est un numéro d'amendement et chaque valeur est la "place" de cet amendement, + nettoyée et prête à l'utilisation. + """ + place_dict = {} + for i in tqdm(nattributed["N ° amdt"]): + if i != 0: # Assurez-vous que cette condition est nécessaire. + try: + place = d[i].split("------")[1].split("\n")[2].replace(":", "").replace("\n", "") + place_dict[i] = place.strip() # .strip() pour enlever les espaces blancs avant et après + except IndexError as e: + print(e) + pass + return place_dict + +def get_place_senat(d,nattributed): + """ + Extrait et associe l'information de position des amendements dans le texte à leurs numéros correspondants dans un dictionnaire. + + Parcourt un DataFrame contenant des numéros d'amendements et extrait leur "place" spécifique à partir d'un dictionnaire + donné. La "place" fait référence à la position de l'amendement dans le texte, identifiant si l'amendement vise un article + existant ou introduit de nouvelles dispositions non couvertes par les articles existants. + + Args: + d (dict): Dictionnaire avec les numéros d'amendements comme clés et le texte complet de l'amendement comme valeurs. + nattributed (DataFrame): DataFrame contenant au moins une colonne "N ° amdt" avec les numéros d'amendements. + + Returns: + dict: Un dictionnaire où chaque clé est un numéro d'amendement et chaque valeur est la "place" de cet amendement, + nettoyée et prête à l'utilisation. + """ + place_dict = dict() + for i in tqdm(list(nattributed["N ° amdt"])): + if i != 0: + try: + place = "ARTICLE"+d[i].split("\nARTICLE")[1].split("\n")[0] + place_dict[i] = place + except: + continue + return(place_dict) + +def attribution_via_texte(moulinette2,attributed,nattributed,corps_dict,corps_dict2,ressemblance,CGI,CDD,CIBS,PLF,CodeExt,Mots,place_dict): + """ + Attribue des amendements à des catégories spécifiques basées sur leur contenu texte. + + Cette fonction examine le contenu texte de chaque amendement pour déterminer à quelle catégorie + il appartient (par exemple, CGI, CDD, CIBS, PLF, etc.) en utilisant des règles métiers de recherche + et de correspondance de texte. Les amendements sont ensuite mis à jour dans le DataFrame `moulinette2` + avec l'attribution finale basée sur ces critères. + + Args: + moulinette2 (DataFrame): DataFrame contenant les amendements à attribuer. + attributed (DataFrame): DataFrame contenant les amendements déjà attribués (non utilisé dans cette fonction). + nattributed (DataFrame): DataFrame contenant les amendements non attribués. + corps_dict (dict): Dictionnaire avec le corps des amendements non attribués. + corps_dict2 (dict): Dictionnaire alternatif avec le corps des amendements non attribués. + ressemblance (int): Seuil de similarité pour la correspondance de texte. + CGI (dict): Dictionnaire des articles du Code Général des Impôts pour l'attribution. + CDD (dict): Dictionnaire des articles des Douanes pour l'attribution. + CIBS (dict): Dictionnaire des articles CIBS pour l'attribution. + PLF (dict): Dictionnaire des articles du Projet de Loi de Finances pour l'attribution. + CodeExt (dict): Dictionnaire des codes externes pour l'attribution aux externes bercy (code du cinéma par ex). + Mots (dict): Dictionnaire des mots-clés pour l'attribution. + place_dict (dict): Dictionnaire de la position des amendements (non utilisé dans cette fonction). + + Returns: + DataFrame: Le DataFrame `moulinette2` mis à jour avec les attributions finales pour chaque amendement. + """ + # Initialisation des dictionnaires pour l'attribution + attrib_CGI = {} + attrib_CDD = {} + attrib_CIBS = {} + attrib_PLF = {} + attrib_CodeExt = {} + attrib_Mots = {} + pattern_digits = re.compile('\d+') + + def find_article_and_assign(amdt_id, corps, dictionnaire, attrib_dict, default_value=None): + try: + article = find_longest_substring(corps, dictionnaire) + attrib_dict[amdt_id] = dictionnaire.get(article, default_value) + moulinette2.loc[amdt_id-1, "Attribution finale"] = attrib_dict[amdt_id] + print(f"amendement {amdt_id} a été identifié traitant de l'article {article} il a donc été attribué à {attrib_dict[amdt_id]}") + except IndexError: + pass + + for i in tqdm(nattributed["N ° amdt"]): + corps = corps_dict[i].replace("\n", "") + if any(fuzz.partial_ratio(keyword, corps, processor=rapidfuzz.utils.default_process) > ressemblance for keyword in ["code général des impôts", "CGI"]): + find_article_and_assign(i, corps, CGI, attrib_CGI) + elif fuzz.partial_ratio("biens et services", corps) > ressemblance: + find_article_and_assign(i, corps, CIBS, attrib_CIBS) + elif fuzz.partial_ratio("douanes", corps) > ressemblance: + find_article_and_assign(i, corps, CDD, attrib_CDD, "DGDDI") + else: + try: + text = corps_dict2.get(i, "") + premier_digit = pattern_digits.findall(text.split("ARTICLE")[1])[0] if "ARTICLE" in text else None + if premier_digit: + key = premier_digit + if "alinéa" in text.lower(): + second_digit = pattern_digits.findall(text.split("alinéa", 1)[1])[0] if "alinéa" in text else None + key = f"{premier_digit} {second_digit}" if second_digit else premier_digit + attrib_PLF[i] = PLF.get(key, "") + moulinette2.loc[i - 1, "Attribution finale"] = attrib_PLF[i] + print(f"amendement {i} a été identifié comme PLF traitant de l'article {key} il a donc été attribué à {attrib_PLF[i]}") + except: + continue + + return(moulinette2) + +def get_dicts_expo_sommaire(moulinetteMots_expo): + """ + Construit un dictionnaire à partir d'un DataFrame en associant des mots-clés à des ministères ou bureaux compétents. + + Cette fonction parcourt chaque ligne du DataFrame fourni et construit un dictionnaire où chaque clé est un + mot-clé unique et chaque valeur est le ministère ou la direction compétente associée à ce mot-clé. + + Args: + moulinetteMots_expo (DataFrame): DataFrame contenant au moins deux colonnes: "Mots-clés" et + "Ministère/direction compétents". Chaque ligne représente une association + unique entre un mot-clé et un ministère/direction compétent. + Returns: + dict: Dictionnaire où les clés sont des mots-clés et les valeurs sont les ministères ou directions + compétents associés à ces mots-clés. + + Exemple d'utilisation: + >>> moulinetteMots_expo = pd.DataFrame({ + ... "Mots-clés": ["budget", "fiscalité"], + ... "Ministère/direction compétents": ["Ministère des Finances", "Direction Générale des Finances"] + ... }) + >>> get_dicts_expo_sommaire(moulinetteMots_expo) + {'budget': 'Ministère des Finances', 'fiscalité': 'Direction Générale des Finances'} + """ + # Utilisation de la compréhension de dictionnaire pour une construction directe + Mots_expo = {row["Mots-clés"]: row["Ministère/direction compétents"] for _, row in moulinetteMots_expo.iterrows()} + return Mots_expo + + +def attribution_via_mots_cles_expo_sommaire(moulinette2, attributed, nattributed, corps_dict, corps_dict2, score_ressemblance, Mots, place_dict, Mots_expo, CodeExt): + """ + Attribue des amendements à des bureaux spécifiques basées sur la correspondance de mots-clés + dans leur texte de l'exposé sommaire, en utilisant différents dictionnaires de mots-clés. + + Cette fonction examine le texte de chaque amendement pour identifier des correspondances avec des mots-clés + définis dans plusieurs dictionnaires (Codes Exterieurs à Bercy, Mots, Mots_expo). Si un mot-clé correspondant est trouvé + avec une similarité supérieure ou égale à un seuil de score de ressemblance, l'amendement est attribué + à la catégorie associée à ce mot-clé. + + Args: + moulinette2 (DataFrame): DataFrame contenant les amendements à attribuer. + attributed (DataFrame): DataFrame des amendements déjà attribués (non utilisé dans cette fonction). + nattributed (DataFrame): DataFrame des amendements non attribués. + corps_dict (dict): Dictionnaire contenant le début des amendements non attribués. + corps_dict2 (dict): Dictionnaire alternatif sensiblement plus long contenant le texte des amendements (non utilisé dans cette fonction). + score_ressemblance (int): Seuil de score de similarité pour la correspondance des mots-clés. + Mots (dict): Dictionnaire des mots-clés généraux et leurs attributions correspondantes. + place_dict (dict): Dictionnaire de la position des amendements (non utilisé dans cette fonction). + Mots_expo (dict): Dictionnaire des mots-clés spécifiques à l'exposé sommaire et leurs attributions. + CodeExt (dict): Dictionnaire des codes externes et leurs attributions. + + Returns: + DataFrame: Le DataFrame moulinette2 mis à jour avec les attributions finales pour chaque amendement + basé sur les mots-clés correspondants. + + La fonction déléguée `attribuer_code` est utilisée pour rechercher des correspondances de mots-clés dans le + texte de chaque amendement et attribuer l'amendement à la catégorie appropriée si une correspondance est trouvée. + L'attribution est effectuée séquentiellement pour chaque dictionnaire de mots-clés fourni. + """ + def attribuer_code(i, dictionnaire, attrib_dict): + cleaned_text = corps_dict[i].replace("\n", "") + for key, value in dictionnaire.items(): + if fuzz.partial_ratio(key, cleaned_text, processor=rapidfuzz.utils.default_process) >= score_ressemblance: + attrib_dict[i] = value + print(f"amendement {i} contient {key} il a donc été attribué à {value}") + moulinette2.loc[i-1, "Attribution finale"] = value + break # Sortie immédiate après la première correspondance + + attrib_CodeExt = {} + attrib_Mots = {} + attrib_Mots_expo = {} + + for i in tqdm(nattributed["N ° amdt"].tolist()): + attribuer_code(i, CodeExt, attrib_CodeExt) + attribuer_code(i, Mots, attrib_Mots) + attribuer_code(i, Mots_expo, attrib_Mots_expo) + + return moulinette2 + +def missing_code(moulinette2): + """ + Identifie et affiche les articles les plus communs non attribués pour des codes de loi spécifiques. + + Cette fonction parcourt une liste prédéfinie de codes de loi (par exemple, "c douanes", "CGI") et + recherche dans un DataFrame donné les cas où l'attribution à l'article des codes pour ces lois est manquante. + Elle affiche ensuite les articles non attribués les plus communs pour chaque code de loi. + + Args: + moulinette2 (DataFrame): DataFrame contenant des données d'amendements, incluant les colonnes + "Attribution à l'article des codes" et "Code & loi" pour identifier les + attributions manquantes, ainsi que "Article du CGI ou LPF retraité" pour + déterminer les articles les plus communs non attribués. + + Aucune valeur n'est retournée. La fonction se contente d'afficher les résultats. + + Exemple d'utilisation: + Supposons que `moulinette2` soit un DataFrame contenant des informations sur des amendements, + y compris leur code de loi et l'attribution à l'article des codes. Cette fonction peut être utilisée + pour identifier quels articles liés au Code Général des Impôts (CGI) ou au code des douanes n'ont pas + encore été attribués, en affichant les plus communs parmi eux afin de modifier les règles métier intelligement. + """ + codes_loi = ["c douanes", "CGI"] # Liste des "Code & loi" à vérifier + + for code in codes_loi: + print(f"codes qui manquent pour le {code}") + + # Filtrage direct des données non attribuées pour le "Code & loi" actuel + nattributed = moulinette2[~moulinette2["Attribution à l'article des codes"].notnull() & + (moulinette2["Code & loi"] == code)] + + # Affichage des articles les plus communs non attribués + common_articles = Counter(nattributed["Article du CGI ou LPF retraité"].astype(str)).most_common() + print(common_articles) + + +def ecrire_resultat(moulinette_location): + file_path = '..\\logs.txt' + + with open(file_path,encoding="utf-8") as file: + text = file.read() + + #### Première partie + d = {} + for i in text.split("identifier les numéros d’amendements identiques au sein d’une même lecture")[1].split("tour")[1].split("Identification des identiques dans les PLF précédents")[0].split("\n"): + if "amendement" in i: + try: + n_amdt = i.split("amendement numero ")[1].split(" ")[0] + corr = i.split("[")[1].split("]")[0].replace(",","") + corr = corr.replace(" "," (")+")" + d[n_amdt] = d.get(n_amdt,'')+str(corr)+"/ " + except: + continue + # j'ai un dictionnaire avec des clés en entier de 1 à n (non continu) + # une liste de taille n qui va stocker mes résultats + try: + taille = max([int(i) for i in list(d.keys())]) + except: + taille = 0 + liste_sim = [] + for i in range(1,taille): + if str(i) in list(d.keys()): + liste_sim.append(d[str(i)]) + else: + liste_sim.append("") + + #### Seconde partie + d = {} + for i in text.split("Identification des identiques dans les PLF précédents")[1].split("attribution supplémentaire mots clés")[0].split("\n"): + if "amendement numero" in i: + n_amdt = i.split("amendement numero ")[1].split(" ")[0] + corr = i.split("similaire à ")[1] + #corr = corr.replace("[","(").replace("]",")") + d[n_amdt] = d.get(n_amdt,'')+str(corr)+" " + # j'ai un dictionnaire avec des clés en entier de 1 à n (non continu) + # une liste de taille n qui va stocker mes résultats + try: + taille = max([int(i) for i in list(d.keys())]) + except: + taille = 0 + liste_prec = [] + for i in range(1,taille): + if str(i) in list(d.keys()): + liste_prec.append(d[str(i)]) + else: + liste_prec.append("") + ### Troisième partie + d = {} + for i in text.split("Attribution de l’amendement par l’IA")[1].split("Codes qu'il manque au niveau de la moulinette")[0].split("\n"): + if ("amendement" in i and "déjà " not in i): + try: + n_amdt = i.split("amendement ")[1].split(" ")[0] + corr = i.split("comme ")[1].split(" ")[0]+" "+i.split("article ")[1].split("il")[0]+" "+i.split("attribué à ")[1] + d[n_amdt] = corr + except: + try: + n_amdt = i.split("amendement ")[1].split("contient")[0] + corr = i.split("contient")[1].split(" il a donc été")[0] + d[n_amdt] = corr + except: + try: + n_amdt = i.split("amendement ")[1].split(" ")[0] + corr = i.split("traitant de l'article ")[1].split(" il a donc été")[0] + d[n_amdt] = corr + except: + continue + print(d) + try: + taille = max([int(i) for i in list(d.keys())]) + except: + taille = 0 + liste_txt = [] + for i in range(1,taille): + if str(i) in list(d.keys()): + liste_txt.append(d[str(i)]) + else: + liste_txt.append("") + print(liste_txt) + + + moulinette2 = pd.read_excel(moulinette_location,sheet_name=1) + moulinette2 = moulinette2[moulinette2["N ° amdt"] != 0] + + for i in [liste_sim,liste_prec,liste_txt]: + while(len(i) != len(moulinette2)): + i.append("") + + moulinette2["Similaires"] = liste_sim + moulinette2["Similaires_prec"] = liste_prec + moulinette2["attributions IA"] = liste_txt + + moulinette2 = moulinette2[['N ° amdt', "Sort de l'amdt", 'État', 'Article PLF + alinéa', + 'Article du CGI ou LPF', 'Article du CGI ou LPF retraité', 'Code & loi', + "Attribution à l'article des codes (non retraitée)", + "Attribution à l'article des codes", "Attribution à l'article du PLF",'Similaires', 'Similaires_prec', 'attributions IA','Attribution finale']] + + GT = pd.read_csv("..\\attribution_finale.csv") + moulinette2["Attribution finale avec DTNUM"] = GT["Attribution finale"] + moulinette2 = moulinette2.rename(columns={'Attribution finale': 'Attribution Moulinette'}) + + moulinette2.to_csv("..\\attribution_finale.csv",sep=";",encoding="utf-16") + moulinette2.to_excel("..\\attribution_finale.xlsx") + return(0) diff --git a/Fuzzy_attrib/tkinter_interface.py b/Fuzzy_attrib/tkinter_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..8449107068e7c44cabd1782dd1d3dffea1aff781 --- /dev/null +++ b/Fuzzy_attrib/tkinter_interface.py @@ -0,0 +1,239 @@ +import tkinter as tk +from tkinter import filedialog +from tkinter import messagebox +from functions import * +import sys + +doc = f"""Bienvenue sur le Trieur d'amendements DLF! + +Pour utiliser cet outil, veuillez suivre les étapes suivantes: + +1. Indiquez l'emplacement de la moulinette, du PLF à trier et de l'historique des PLFs en cliquant sur les boutons 'Parcourir...'. + +2. Définissez le score de ressemblance souhaité entre les amendements du PLF à trier et ceux de l'historique des PLFs en déplaçant le curseur. + +3. Cliquez sur le bouton 'Trier les amendements' pour lancer le tri. + +4. Le résultat sera affiché dans une boîte de dialogue. + +Si vous avez besoin d'aide, n'hésitez pas à cliquer sur le bouton '?' pour afficher cette documentation. + +Inputs: +- Moulinette location: emplacement de la moulinette déjà lancée manuellement +- PLF à trier location: emplacement des liasses du PLF à trier +- Historique des PLFs location: emplacement de l'historique des PLFs précédents au format xlsx +- Score de ressemblance: score de ressemblance souhaité entre les amendements + +Bonne utilisation! +""" + +class AideWindow: + def __init__(self, master): + self.master = master + master.title("Aide") + self.label = tk.Label(master, text=doc) + self.label.pack() + +class TrieurAmendementsDLF: + def __init__(self, master): + self.master = master + master.title("Trieur d'amendements DLF") + + # Input pour la moulinette + self.label_moulinette = tk.Label(master, text="Moulinette déjà lancée:") + self.label_moulinette.grid(row=0, column=0) + self.entry_moulinette = tk.Entry(master) + self.entry_moulinette.grid(row=0, column=1) + self.button_moulinette = tk.Button(master, text="Parcourir...", command=self.get_moulinette_location) + self.button_moulinette.grid(row=0, column=2) + + # Input pour le PLF à trier + self.label_plf_a_trier = tk.Label(master, text="Liasses PLF à trier:") + self.label_plf_a_trier.grid(row=1, column=0) + self.entry_plf_a_trier = tk.Entry(master) + self.entry_plf_a_trier.grid(row=1, column=1) + self.button_plf_a_trier = tk.Button(master, text="Parcourir...", command=self.get_plf_a_trier_location) + self.button_plf_a_trier.grid(row=1, column=2) + + # Input pour l'historique des PLFs + self.label_historique_plfs = tk.Label(master, text="Historique des PLFs:") + self.label_historique_plfs.grid(row=2, column=0) + self.entry_historique_plfs = tk.Entry(master) + self.entry_historique_plfs.grid(row=2, column=1) + self.button_historique_plfs = tk.Button(master, text="Parcourir...", command=self.get_historique_plfs_location) + self.button_historique_plfs.grid(row=2, column=2) + + # Curseur pour le score de ressemblance + self.score_ressemblance = tk.IntVar(value=80) + self.label_score_ressemblance = tk.Label(master, text="Score de ressemblance PLF actuel:") + self.label_score_ressemblance.grid(row=3, column=0) + self.scale_score_ressemblance = tk.Scale(master, from_=1, to=100, orient=tk.HORIZONTAL,length=200,variable=self.score_ressemblance) + self.scale_score_ressemblance.grid(row=3, column=1) + + # Curseur pour le score de ressemblance + self.score_ressemblance2 = tk.IntVar(value=70) + self.label_score_ressemblance2 = tk.Label(master, text="Score de ressemblance PLF passés:") + self.label_score_ressemblance2.grid(row=4, column=0) + self.scale_score_ressemblance2 = tk.Scale(master, from_=1, to=100, orient=tk.HORIZONTAL,length=200,variable=self.score_ressemblance2) + self.scale_score_ressemblance2.grid(row=4, column=1) + + # Bouton pour trier les amendements + self.button_trier = tk.Button(master, text="Trier les amendements", command=self.trier_amendements) + self.button_trier.grid(row=5, column=1) + + # Case besoin d'aide + self.label_aide = tk.Label(master, text="Besoin d'aide?") + self.label_aide.grid(row=6, column=0) + self.button_aide = tk.Button(master, text="?", command=self.afficher_aide) + self.button_aide.grid(row=6, column=1) + + def get_moulinette_location(self): + location = filedialog.askopenfilename() + self.entry_moulinette.insert(0, location) + + def get_plf_a_trier_location(self): + location = filedialog.askdirectory() + self.entry_plf_a_trier.insert(0, location) + + def get_historique_plfs_location(self): + location = filedialog.askdirectory() + self.entry_historique_plfs.insert(0, location) + + def trier_amendements(self): + #sys.stdout = open('./logs.txt', 'w') + + try: + os.remove("../logs.txt") + except OSError: + pass + + + moulinette_location = self.entry_moulinette.get() + plf_a_trier_location = self.entry_plf_a_trier.get() + historique_plfs_location = self.entry_historique_plfs.get() + score_ressemblance = self.scale_score_ressemblance.get() + score_ressemblance2 = self.scale_score_ressemblance2.get() + + functions_mapping = { + "AN": { + "update_function": get_retires_and_inexistants, + "extraction_function": get_corps_amendements, + "extraction_place_function": get_place, + "comparison_function": compare_same_PLF, + "comparison_function_old": compare_old_PLF + }, + "SENAT": { + "update_function": get_retires_and_inexistants_senat, + "extraction_function": get_corps_amendements_senat, + "extraction_place_function": get_place_senat, + "comparison_function": compare_same_PLF_senat, + "comparison_function_old": compare_old_PLF_senat + } + } + + + print("###############\n"*5) + print("on charge les amendements") + print("###############\n"*5) + d,doc_type = identifier_document_type(plf_a_trier_location) + doc_functions = functions_mapping.get(doc_type, None) + if doc_functions: + update_function = doc_functions.get("update_function") + extraction_function = doc_functions.get("extraction_function") + extraction_place_function = doc_functions.get("extraction_place_function") + comparison_function = doc_functions.get("comparison_function") + comparison_function_old = doc_functions.get("comparison_function_old") + + moulinette2 = pd.read_excel(moulinette_location,sheet_name=1) + moulinette2[moulinette2.columns[0]] = pd.to_numeric(moulinette2[moulinette2.columns[0]].replace('[a-zA-Z]', '', regex=True)) + moulinette2 = moulinette2[moulinette2["N ° amdt"] != 0] + nattributed = moulinette2[~moulinette2["Attribution finale"].notnull()] + attributed = moulinette2[moulinette2["Attribution finale"].notnull()] + get_attrib_score(attributed,nattributed) + + print("###############\n"*5) + print("on enlève les amendements qui ont été retirés avant publication et ceux qui sont inexistants") + print("###############\n"*5) + + # Application de la fonction choisie pour mettre à jour 'moulinette2', si un type de document valide est fourni + if update_function: + moulinette2 = update_function(nattributed, moulinette2, d) + nattributed = moulinette2[~moulinette2["Attribution finale"].notnull()] + attributed = moulinette2[moulinette2["Attribution finale"].notnull()] + get_attrib_score(attributed,nattributed) + + print("###############\n"*5) + print("on récupère le corps des amendements") + print("###############\n"*5) + # Extraction des corps des amendements en utilisant la fonction appropriée + if extraction_function: + corps_dict, corps_dict2, corps_dict3 = extraction_function(d) + mapping_PLF = mapping_PLF_function(corps_dict2) + write_PLF(corps_dict3) + + print("###############\n"*5) + print("Attribution de l’amendement par l’IA (en fonction des mots clés relevés dans le corps") + print("###############\n"*5) + moulinettePLF, moulinetteCGI, moulinetteCIBS, moulinetteCDD, moulinetteCodeExt, moulinetteMots = (pd.read_excel(moulinette_location, sheet_name=sheet_num) for sheet_num in [2, 3, 4, 5, 6, 7]) + CGI,PLF,CIBS,CDD,CodeExt,Mots = get_dicts(moulinetteCGI,moulinettePLF,moulinetteCIBS,moulinetteCDD,moulinetteCodeExt,moulinetteMots) + + # Extraction des places en utilisant la fonction appropriée + if extraction_place_function: + place_dict = extraction_place_function(d, nattributed) + + moulinette2 = attribution_via_texte(moulinette2,attributed,nattributed,corps_dict,corps_dict2,score_ressemblance,CGI,CDD,CIBS,PLF,CodeExt,Mots,place_dict) + nattributed = moulinette2[~moulinette2["Attribution finale"].notnull()] + attributed = moulinette2[moulinette2["Attribution finale"].notnull()] + get_attrib_score(attributed,nattributed) + + print("###############\n"*5) + print("identifier les numéros d’amendements identiques au sein d’une même lecture") + print("###############\n"*5) + if comparison_function: + moulinette2 = comparison_function(corps_dict2, mapping_PLF, nattributed, attributed, moulinette2, score_ressemblance) + + nattributed = moulinette2[~moulinette2["Attribution finale"].notnull()] + attributed = moulinette2[moulinette2["Attribution finale"].notnull()] + get_attrib_score(attributed,nattributed) + + print("###############\n"*5) + print("Identification des identiques dans les PLF précédents") + print("###############\n"*5) + liste_doc = glob.glob(historique_plfs_location+"\\*.xlsx") + df,mapping_historique = get_old_PLFs(liste_doc) + + if comparison_function: + moulinette2 = comparison_function_old(df, corps_dict2, mapping_PLF, nattributed, attributed, moulinette2, score_ressemblance2, mapping_historique) + + nattributed = moulinette2[~moulinette2["Attribution finale"].notnull()] + attributed = moulinette2[moulinette2["Attribution finale"].notnull()] + get_attrib_score(attributed,nattributed) + + print("attribution supplémentaire mots clés") + try: + moulinetteMots_expo = pd.read_excel(moulinette_location,sheet_name=8) + Mots_expo = get_dicts_expo_sommaire(moulinetteMots_expo) + moulinette2 = attribution_via_mots_cles_expo_sommaire(moulinette2,attributed,nattributed,corps_dict2,corps_dict3,score_ressemblance,Mots,place_dict,Mots_expo,CodeExt) + nattributed = moulinette2[~moulinette2["Attribution finale"].notnull()] + attributed = moulinette2[moulinette2["Attribution finale"].notnull()] + get_attrib_score(attributed,nattributed) + except Exception as e: + print(e) + print("pas de feuille exposé sommaire") + + print("###############\n"*5) + print("Codes qu'il manque au niveau de la moulinette") + print("###############\n"*5) + missing_code(moulinette2) + moulinette2.to_csv("../attribution_finale.csv") + ecrire_resultat(moulinette_location) + tk.messagebox.showinfo("Information","Terminé, ouvrir le fichier logs.txt pour plus d'informations, bon courage à tous et toutes! Joseph") + + def afficher_aide(self): + aide_window = tk.Toplevel(self.master) + app = AideWindow(aide_window) + +if __name__ == "__main__": + root = tk.Tk() # Création de l'instance de Tk + app = TrieurAmendementsDLF(root) # Création de l'instance de la classe TrieurAmendementsDLF + root.mainloop() # Lancement de la boucle principale d'événements de Tk \ No newline at end of file