293 lines
11 KiB
Python
293 lines
11 KiB
Python
from langchain.tools import tool
|
||
from langgraph.prebuilt import InjectedState
|
||
from langchain_core.tools import InjectedToolCallId
|
||
from langchain_core.messages import ToolMessage
|
||
from langgraph.types import Command
|
||
from tavily import TavilyClient
|
||
from pathlib import Path
|
||
from typing import List, Dict, Annotated
|
||
import sys
|
||
import os
|
||
from langgraph.types import interrupt
|
||
|
||
from .StateElements.TodoElement import TodoElement
|
||
from .VectorDatabase import VectorDatabase
|
||
from .InterruptPayload import InterruptPayload
|
||
|
||
@tool
|
||
def append_part_to_report(contenu:str)->str:
|
||
"""
|
||
Permet d'ajouter une nouvelle partie au rapport de stage
|
||
|
||
Args:
|
||
contenu (str): Partie à ajouter, écris ici ce que tu veux
|
||
|
||
Returns:
|
||
str: Retour, une confirmation, ou un message d'erreur
|
||
"""
|
||
try:
|
||
# Récupérer le chemin vers le point d'entrée
|
||
base_dir: Path = Path(sys.argv[0]).resolve().parent
|
||
full_path = reports_dir / "RAPPORT_STAGE.md"
|
||
|
||
query= interrupt(InterruptPayload({
|
||
content: contenu
|
||
}).toJSON())
|
||
|
||
response = InterruptPayload.fromJSON(query)
|
||
if response.isAccepted():
|
||
with open(full_path, "a", encoding="utf-8") as f: # Écrire le contenu
|
||
f.write(response.get("content"))
|
||
|
||
return "Requête acceptée et validée ! Tu peux considérer cette tâche comme complétée."
|
||
else:
|
||
return "ERREUR! L'utilisateur a refusé ta demande. Tu devrais lui demander pourquoi avoir refusé, et comment améliorer cette partie."
|
||
|
||
except Exception as e:
|
||
return f"Erreur lors de l'écriture: {str(e)}"
|
||
|
||
|
||
@tool
|
||
def internet_search(query: str)->dict:
|
||
"""
|
||
Rechercher une information sur internet
|
||
|
||
Args:
|
||
query (str): Terme recherché
|
||
Returns:
|
||
dict: Retour de la recherche
|
||
"""
|
||
response = interrupt(InterruptPayload({
|
||
'query': query
|
||
}).toJSON())
|
||
|
||
resp = InterruptPayload.fromJSON(response) # Je reforme mon objet depuis la string json
|
||
|
||
if resp.isAccepted():
|
||
return TavilyClient().search(resp.get("query"), model='auto')
|
||
else:
|
||
return {'error': "Utilisation de cet outil refusée par l'utilisateur"}
|
||
|
||
@tool
|
||
def editTodo(index:int, todoState:int, state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId])->Command: # https://stackoverflow.com/a/79525434
|
||
"""
|
||
Modifier l'état d'une tâche (TODO)
|
||
|
||
Args:
|
||
index (int): Index de la tâche à modifier, en commançant à 0 pour la première tâche.
|
||
todoState (int): Nouvel état. 0 pour "non commencé, 1 pour "en cours", 2 pour "complété"
|
||
"""
|
||
if "todo" not in state.keys(): return Command(update={"messages": [ToolMessage(content="Echec!", tool_call_id=tool_call_id)]})
|
||
if len(state["todo"]) <= index:
|
||
# Erreur, l'index est trop grand
|
||
return Command(update={"messages": [ToolMessage(content="Index en dehors de la liste, echec!", tool_call_id=tool_call_id)]})
|
||
|
||
state["todo"] = [TodoElement.fromJSON(e) for e in state["todo"]] # Convertion vers de vraies instances
|
||
state["todo"][index].state = todoState # Modification de l'état de cette tâche
|
||
|
||
# Toutes les tâches complétées ?
|
||
found = False
|
||
for task in state["todo"]: # Pour chaque tâche
|
||
if task.state != 2: # Si elle n'est pas terminée
|
||
found = True
|
||
break
|
||
|
||
if not found: state["todo"] = [] # Toutes les tâches terminées, je peux clear la TODO list du state
|
||
return Command(update={
|
||
"messages": [ToolMessage(content="Réussite!", tool_call_id=tool_call_id)],
|
||
"todo": [x.toJSON() for x in state["todo"]] # Update du state, # medium.com/@o39joey/a-comprehensive-guide-to-langgraph-managing-agent-state-with-tools-ae932206c7d7
|
||
})
|
||
|
||
@tool
|
||
def addTodo(name:str, description:str, state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId])->Command:
|
||
"""
|
||
Ajouter une nouvelle tâche/TODO
|
||
|
||
Args:
|
||
name (str): Nom de cette tâche
|
||
description (str): Une ou deux phrases pour décrire le travail à effectuer dans ce TODO
|
||
"""
|
||
if "todo" not in state.keys(): state["todo"] = []
|
||
|
||
state["todo"] = [TodoElement.fromJSON(e) for e in state["todo"]] # Convertion vers de vraies instances
|
||
state["todo"].append(TodoElement(name, description))
|
||
return Command(update={
|
||
"messages": [ToolMessage(content="Réussite!", tool_call_id=tool_call_id)],
|
||
"todo": [x.toJSON() for x in state["todo"]] # Update du state, # medium.com/@o39joey/a-comprehensive-guide-to-langgraph-managing-agent-state-with-tools-ae932206c7d7
|
||
})
|
||
|
||
@tool
|
||
def removeTodo(index:int, state: Annotated[dict, InjectedState], tool_call_id: Annotated[str, InjectedToolCallId])->Command:
|
||
"""
|
||
Retirer une tâche/TODO de la liste des tâches
|
||
|
||
Args:
|
||
index (int): Position de la tâche dans la liste, commence à 0 pour le premier TODO
|
||
"""
|
||
if "todo" not in state.keys(): return Command(update={"messages": [ToolMessage(content="Echec!", tool_call_id=tool_call_id)]})
|
||
if len(state["todo"]) <= index:
|
||
# Erreur, l'index est trop grand
|
||
return Command(update={"messages": [ToolMessage(content="Index en dehors de la liste, echec!", tool_call_id=tool_call_id)]})
|
||
|
||
state['todo'].pop(index)
|
||
return Command(update={
|
||
"messages": [ToolMessage(content="Réussite!", tool_call_id=tool_call_id)],
|
||
"todo": [x for x in state["todo"]] # Update du state, # medium.com/@o39joey/a-comprehensive-guide-to-langgraph-managing-agent-state-with-tools-ae932206c7d7
|
||
})
|
||
|
||
@tool
|
||
def read_file(file_path: str) -> str:
|
||
"""
|
||
Lire le contenu d'un fichier texte.
|
||
|
||
Args:
|
||
file_path (str): Chemin d'accès relatif vers le fichier à lire.
|
||
|
||
Returns:
|
||
str: Le contenu du fichier, ou un message d'erreur.
|
||
"""
|
||
try:
|
||
base_dir:str = Path(sys.argv[0]).resolve().parent.as_posix() # Récupérer le chemin vers le point d'entrée du programme
|
||
full_path:str = base_dir + (file_path if file_path.startswith('/') else f'/{file_path}') # Puis générer le chemin vers le fichier
|
||
|
||
with open(full_path, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
return content
|
||
|
||
except Exception as e:
|
||
return f"Erreur lors de la lecture : {str(e)}"
|
||
|
||
@tool
|
||
def get_skill(skill_name:str=None)->str:
|
||
"""
|
||
Obtenir un skill, la description de comment faire quelque chose.
|
||
|
||
Args:
|
||
skill_name (str, optional): Nom du skill recherché. Si ce n'est pas donné, listera les skills disponibles.
|
||
|
||
Returns:
|
||
str: Sans nom de skill, la liste de ceux disponibles. Si un nom de skill est donné, l'ensemble de ce skill.
|
||
"""
|
||
try:
|
||
base_dir:str = Path(sys.argv[0]).resolve().parent.as_posix() # Récupérer le chemin vers le point d'entrée du programme
|
||
full_path:str = base_dir + "/skills.md" # Puis générer le chemin vers le fichier
|
||
|
||
with open(full_path, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
if skill_name is None:
|
||
# Liste des skills
|
||
names = []
|
||
for part in content.split("---")[1:]: # Pas besoin de la première partie
|
||
names.append(part.splitlines()[1].split(' ')[1]) # Récupérer le nom du skill à la seconde ligne
|
||
return str(names)
|
||
|
||
else:
|
||
# Récupérer un skill
|
||
for part in content.split("---")[1:]:
|
||
if skill_name.lower() in part.lower(): # Dégueulasse pour l'opti mais c'est rapide à implémenter
|
||
# Si c'est ce skill qui est recherché
|
||
return f"{content.split("---")[0]}\n\n{part}"
|
||
|
||
return "Ce skill n'existe pas ! Regarde la liste des skills en rappelant cet outil sans arguments !"
|
||
|
||
except Exception as e:
|
||
return f"Erreur lors de la lecture : {str(e)}"
|
||
|
||
@tool
|
||
def search_in_files(query:str, state: Annotated[dict, InjectedState])->str:
|
||
"""
|
||
Rechercher quelque chose dans les documents enregistrés localement.
|
||
Dans le cas actuel, ces documents sont des rapports hebdomadaires de stage.
|
||
|
||
Args:
|
||
query (str): La requête recherchée.
|
||
|
||
Returns:
|
||
str: Échantillons de documents correspondants, concaténés en une seule chaîne de caractères.
|
||
"""
|
||
bdd = VectorDatabase.getChroma() # Récupère l'unique instance de cette BDD, c'est un SIngleton
|
||
|
||
retrieved_docs = bdd.similarity_search(query, k=5) # 5 documents
|
||
|
||
# Conversion des documents en texte
|
||
docs_content = "\n".join(
|
||
[f"Document {i+1}:\n{doc.page_content}" for i,doc in enumerate(retrieved_docs)]
|
||
)
|
||
|
||
# Sauvegarde des données dans le State
|
||
state["ragQuery"] = query
|
||
state["ragDocuments"] = retrieved_docs
|
||
|
||
return docs_content # Retourne la liste de documents trouvés
|
||
|
||
@tool
|
||
def write_week_report(numero_semaine:int, contenu:str)->str:
|
||
"""
|
||
Écrire un rapport sur une semaine du stage. Sauvegardera ce rapport dans un fichier en mémoire pour un usage futur.
|
||
|
||
Args:
|
||
numero_semaine (int): Semaine du stage. Commence à 1 pour la première semaine
|
||
contenu (str): Ce qu'il faut écrire dans ce rapport
|
||
|
||
Returns:
|
||
str: CHemin vers le fichier, ou une erreur en cas de problème
|
||
"""
|
||
try:
|
||
# Récupérer le chemin vers le point d'entrée
|
||
base_dir: Path = Path(sys.argv[0]).resolve().parent
|
||
reports_dir = base_dir / "rapports_resumes" # Chemin du dossier des rapports
|
||
reports_dir.mkdir(parents=True, exist_ok=True) # Créer le dossier
|
||
|
||
file_name = f"rapport_semaine_{numero_semaine}.txt"
|
||
full_path = reports_dir / file_name
|
||
|
||
with open(full_path, "w", encoding="utf-8") as f: # Écrire le contenu
|
||
f.write(contenu)
|
||
|
||
return str(full_path)
|
||
|
||
except Exception as e:
|
||
return f"Erreur lors de l'écriture: {str(e)}"
|
||
|
||
@tool
|
||
def write_library_tools_details_on_internship(contenu:str)->str:
|
||
"""
|
||
Enregistrer les détails sur le stage.
|
||
Utilise cet outil pour enregistrer tous les outils, logiciels, programmes, entreprises, ect.. utilisés pendant le stage.
|
||
|
||
Args:
|
||
contenu (str): Une liste de tous les éléments intéréssants, avec quelques détails sur chacun.
|
||
|
||
Returns:
|
||
str: CHemin vers le fichier, ou une erreur en cas de problème
|
||
"""
|
||
try:
|
||
# Récupérer le chemin vers le point d'entrée
|
||
base_dir: Path = Path(sys.argv[0]).resolve().parent
|
||
reports_dir = base_dir / "rapports_resumes" # Chemin du dossier des rapports
|
||
reports_dir.mkdir(parents=True, exist_ok=True) # Créer le dossier
|
||
|
||
file_name = f"rapport_outils.txt"
|
||
full_path = reports_dir / file_name
|
||
|
||
with open(full_path, "w", encoding="utf-8") as f: # Écrire le contenu
|
||
f.write(contenu)
|
||
|
||
return str(full_path)
|
||
|
||
except Exception as e:
|
||
return f"Erreur lors de l'écriture: {str(e)}"
|
||
|
||
def getTools()->List['Tools']:
|
||
"""
|
||
Récupérer la liste des tools
|
||
"""
|
||
return [internet_search, append_part_to_report, editTodo, read_file, search_in_files, addTodo, removeTodo, get_skill]
|
||
|
||
def getWeeklyReportTools()->List['Tools']:
|
||
"""
|
||
Récupérer la liste des tools, POUR LE LLM EN CHARGE DE FAIRE LES RAPPORTS DE CHAQUE SEMAINE
|
||
"""
|
||
return [write_week_report, write_library_tools_details_on_internship, internet_search, search_in_files] |