from langchain.tools import tool from langgraph.prebuilt import InjectedState from langchain_core.tools import InjectedToolCallId from langchain_core.messages import ToolMessage from langgraph.types import Command from tavily import TavilyClient from pathlib import Path from typing import List, Dict, Annotated import sys import os from langgraph.types import interrupt from .StateElements.TodoElement import TodoElement from .VectorDatabase import VectorDatabase from .InterruptPayload import InterruptPayload @tool def internet_search(query: str)->dict: """ Rechercher une information sur internet Args: query (str): Terme recherché Returns: dict: Retour de la recherche """ response = interrupt(InterruptPayload({ 'query': query }).toJSON()) resp = InterruptPayload.fromJSON(response) # Je reforme mon objet depuis la string json if resp.isAccepted(): return TavilyClient().search(resp.get("query"), model='auto') else: return {'error': "Utilisation de cet outil refusée par l'utilisateur"} @tool def write_file(file_path:str, content: str, append:bool=True) -> str: """ Ecrire et ajouter du texte dans un fichier. Args: file_path (str): Chemin d'accès relatif vers le fichier à écrire. content (str): Contenu à écrire dans le fichier. append (bool, optional): Faut-il AJOUTER(True) au fichier, ou REMPLACER son contenu(False) ? True par défaut. Returns: str: Le chemin d'accès relatif vers le fichier en cas de réussite, ou une erreur en cas d'echec """ try: base_dir:str = Path(sys.argv[0]).resolve().parent.as_posix() # Récupérer le chemin vers le point d'entrée du programme full_path:str = base_dir + (file_path if file_path.startswith('/') else f'/{file_path}') # Puis générer le chemin vers le fichier mode = "a" if append else "w" # Mode d'écriture with open(full_path, mode, encoding="utf-8") as f: # Puis j'écris f.write(content) return str(full_path) except Exception as e: return f"Erreur lors de l'écriture: {str(e)}" @tool def editTodo(index:int, todoState:int, state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId])->Command: # https://stackoverflow.com/a/79525434 """ Modifier l'état d'une tâche (TODO) Args: index (int): Index de la tâche à modifier, en commançant à 0 pour la première tâche. todoState (int): Nouvel état. 0 pour "non commencé, 1 pour "en cours", 2 pour "complété" """ if "todo" not in state.keys(): return Command(update={"messages": [ToolMessage(content="Echec!", tool_call_id=tool_call_id)]}) if len(state["todo"]) <= index: # Erreur, l'index est trop grand return Command(update={"messages": [ToolMessage(content="Index en dehors de la liste, echec!", tool_call_id=tool_call_id)]}) state["todo"] = [TodoElement.fromJSON(e) for e in state["todo"]] # Convertion vers de vraies instances state["todo"][index].state = todoState # Modification de l'état de cette tâche # Toutes les tâches complétées ? found = False for task in state["todo"]: # Pour chaque tâche if task.state != 2: # Si elle n'est pas terminée found = True break if not found: state["todo"] = [] # Toutes les tâches terminées, je peux clear la TODO list du state return Command(update={ "messages": [ToolMessage(content="Réussite!", tool_call_id=tool_call_id)], "todo": [x.toJSON() for x in state["todo"]] # Update du state, # medium.com/@o39joey/a-comprehensive-guide-to-langgraph-managing-agent-state-with-tools-ae932206c7d7 }) @tool def addTodo(name:str, description:str, state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId])->Command: """ Ajouter une nouvelle tâche/TODO Args: name (str): Nom de cette tâche description (str): Une ou deux phrases pour décrire le travail à effectuer dans ce TODO """ if "todo" not in state.keys(): state["todo"] = [] state["todo"] = [TodoElement.fromJSON(e) for e in state["todo"]] # Convertion vers de vraies instances state["todo"].append(TodoElement(name, description)) return Command(update={ "messages": [ToolMessage(content="Réussite!", tool_call_id=tool_call_id)], "todo": [x.toJSON() for x in state["todo"]] # Update du state, # medium.com/@o39joey/a-comprehensive-guide-to-langgraph-managing-agent-state-with-tools-ae932206c7d7 }) @tool def removeTodo(index:int, state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId])->Command: """ Retirer une tâche/TODO de la liste des tâches Args: index (int): Position de la tâche dans la liste, commence à 0 pour le premier TODO """ if "todo" not in state.keys(): return Command(update={"messages": [ToolMessage(content="Echec!", tool_call_id=tool_call_id)]}) if len(state["todo"]) <= index: # Erreur, l'index est trop grand return Command(update={"messages": [ToolMessage(content="Index en dehors de la liste, echec!", tool_call_id=tool_call_id)]}) state['todo'].pop(index) return Command(update={ "messages": [ToolMessage(content="Réussite!", tool_call_id=tool_call_id)], "todo": [x for x in state["todo"]] # Update du state, # medium.com/@o39joey/a-comprehensive-guide-to-langgraph-managing-agent-state-with-tools-ae932206c7d7 }) @tool def read_file(file_path: str) -> str: """ Lire le contenu d'un fichier texte. Args: file_path (str): Chemin d'accès relatif vers le fichier à lire. Returns: str: Le contenu du fichier, ou un message d'erreur. """ try: base_dir:str = Path(sys.argv[0]).resolve().parent.as_posix() # Récupérer le chemin vers le point d'entrée du programme full_path:str = base_dir + (file_path if file_path.startswith('/') else f'/{file_path}') # Puis générer le chemin vers le fichier with open(full_path, "r", encoding="utf-8") as f: content = f.read() return content except Exception as e: return f"Erreur lors de la lecture : {str(e)}" @tool def get_skill(skill_name:str=None)->str: """ Obtenir un skill, la description de comment faire quelque chose. Args: skill_name (str, optional): Nom du skill recherché. Si ce n'est pas donné, listera les skills disponibles. Returns: str: Sans nom de skill, la liste de ceux disponibles. Si un nom de skill est donné, l'ensemble de ce skill. """ try: base_dir:str = Path(sys.argv[0]).resolve().parent.as_posix() # Récupérer le chemin vers le point d'entrée du programme full_path:str = base_dir + "/skills.md" # Puis générer le chemin vers le fichier with open(full_path, "r", encoding="utf-8") as f: content = f.read() if skill_name is None: # Liste des skills names = [] for part in content.split("---")[1:]: # Pas besoin de la première partie names.append(part.splitlines()[1].split(' ')[1]) # Récupérer le nom du skill à la seconde ligne return str(names) else: # Récupérer un skill for part in content.split("---")[1:]: if skill_name.lower() in part.lower(): # Dégueulasse pour l'opti mais c'est rapide à implémenter # Si c'est ce skill qui est recherché return f"{content.split("---")[0]}\n\n{part}" return "Ce skill n'existe pas ! Regarde la liste des skills en rappelant cet outil sans arguments !" except Exception as e: return f"Erreur lors de la lecture : {str(e)}" @tool def search_in_files(query:str, state: Annotated[dict, InjectedState])->str: """ Rechercher quelque chose dans les documents enregistrés localement. Dans le cas actuel, ces documents sont des rapports hebdomadaires de stage. Args: query (str): La requête recherchée. Returns: str: Échantillons de documents correspondants, concaténés en une seule chaîne de caractères. """ bdd = VectorDatabase.getChroma() # Récupère l'unique instance de cette BDD, c'est un SIngleton retrieved_docs = bdd.similarity_search(query, k=5) # 5 documents # Conversion des documents en texte docs_content = "\n".join( [f"Document {i+1}:\n{doc.page_content}" for i,doc in enumerate(retrieved_docs)] ) # Sauvegarde des données dans le State state["ragQuery"] = query state["ragDocuments"] = retrieved_docs return docs_content # Retourne la liste de documents trouvés @tool def write_week_report(numero_semaine:int, contenu:str)->str: """ Écrire un rapport sur une semaine du stage. Sauvegardera ce rapport dans un fichier en mémoire pour un usage futur. Args: numero_semaine (int): Semaine du stage. Commence à 1 pour la première semaine contenu (str): Ce qu'il faut écrire dans ce rapport Returns: str: CHemin vers le fichier, ou une erreur en cas de problème """ try: # Récupérer le chemin vers le point d'entrée base_dir: Path = Path(sys.argv[0]).resolve().parent reports_dir = base_dir / "rapports_resumes" # Chemin du dossier des rapports reports_dir.mkdir(parents=True, exist_ok=True) # Créer le dossier file_name = f"rapport_semaine_{numero_semaine}.txt" full_path = reports_dir / file_name with open(full_path, "w", encoding="utf-8") as f: # Écrire le contenu f.write(contenu) return str(full_path) except Exception as e: return f"Erreur lors de l'écriture: {str(e)}" @tool def write_library_tools_details_on_internship(contenu:str)->str: """ Enregistrer les détails sur le stage. Utilise cet outil pour enregistrer tous les outils, logiciels, programmes, entreprises, ect.. utilisés pendant le stage. Args: contenu (str): Une liste de tous les éléments intéréssants, avec quelques détails sur chacun. Returns: str: CHemin vers le fichier, ou une erreur en cas de problème """ try: # Récupérer le chemin vers le point d'entrée base_dir: Path = Path(sys.argv[0]).resolve().parent reports_dir = base_dir / "rapports_resumes" # Chemin du dossier des rapports reports_dir.mkdir(parents=True, exist_ok=True) # Créer le dossier file_name = f"rapport_outils.txt" full_path = reports_dir / file_name with open(full_path, "w", encoding="utf-8") as f: # Écrire le contenu f.write(contenu) return str(full_path) except Exception as e: return f"Erreur lors de l'écriture: {str(e)}" def getTools()->List['Tools']: """ Récupérer la liste des tools """ return [internet_search, write_file, editTodo, read_file, search_in_files, addTodo, removeTodo, get_skill] def getWeeklyReportTools()->List['Tools']: """ Récupérer la liste des tools, POUR LE LLM EN CHARGE DE FAIRE LES RAPPORTS DE CHAQUE SEMAINE """ return [write_week_report, write_library_tools_details_on_internship, internet_search, search_in_files]