""" For more information on LLM+TAG see http://shaunwagner.com/projects/tag LLM+TAG attempts to handle user prompts with local functions instead of sending the prompt to the LLM. This reduces lag and cost. It also means that common functionality such as the "help" and "customer info" functions can be handled by LLM+TAG to avoid memorizing the exact function names. For example, you can type "help" or "get help" or "help me" and LLM+TAG will know to call the help function for you. """ import re import importlib import os import Levenshtein from typing import Tuple, Callable, Dict, Any, Optional # Global registry to hold our mapped functions _REGISTRY = {} _CONFIG_LOADED = False def _load_config(filepath:str="lib/tag.conf")-> None: """ An internal function that loads a LLM-TAG file of functions and utterances. """ # Global variables - only process the config file once. global _REGISTRY, _CONFIG_LOADED # Make sure the config file exists. if not os.path.exists(filepath): print(f"⚠️ Warning: Config file not found at {filepath}") return # Processing line by line, top to bottom current_func_obj = None current_defaults = {} with open(filepath, 'r') as f: for line in f: line = line.strip() # Skip comment lines if not line or line.startswith('#') or line.startswith('//'): continue # Detect function definition with defaults: func_path(args): if line.endswith(':'): match = re.match(r'(\w+(?:\.\w+)*)\((.*)\):', line) # Handle functions with default values if match: full_path = match.group(1) args_str = match.group(2) try: # Find the function module_name, func_name = full_path.rsplit('.', 1) module = importlib.import_module(module_name) current_func_obj = getattr(module, func_name) # Parse defaults current_defaults = {} if args_str.strip(): current_defaults = eval(f"dict({args_str})") except Exception as e: print(f"❌ Failed to load tool {full_path}: {e}") current_func_obj = None current_defaults = {} # Handle functions without default values else: full_path = line[:-1] try: module_name, func_name = full_path.rsplit('.', 1) module = importlib.import_module(module_name) current_func_obj = getattr(module, func_name) current_defaults = {} except Exception as e: print(f"❌ Failed to load tool {full_path}: {e}") current_func_obj = None current_defaults = {} # Map utterance to the current function object elif current_func_obj: clean_utterance = re.sub(r'[^a-z0-9\s\[\]]', '', line.lower()).strip() # Store both the function and its defaults _REGISTRY[clean_utterance] = (current_func_obj, current_defaults) _CONFIG_LOADED = True def _sentence_splitter(text:str)-> list : """Splits text into logical segments based on intent markers.""" starters = ["i", "what", "show", "get", "is", "can", "please", "summarize", "list"] starters_regex = "|".join(starters) pattern = rf"(?i)([\.\!\?\n]+|(?<=\s)(?:and|also|then)(?=\s+(?:{starters_regex})\b))" parts = re.split(pattern, text) sentences = [] for i in range(0, len(parts)-1, 2): combined = (parts[i] + parts[i+1]).strip() if len(combined) > 1: sentences.append(combined) if len(parts) % 2 != 0 and len(parts[-1].strip()) > 1: sentences.append(parts[-1].strip()) return sentences def _execute_tool(func_tuple: Tuple[Callable[..., Any], Dict[str, Any]], extracted_val:Optional[str]=None)-> Optional[str]: """Safely executes the tool function with defaults.""" try: # Unpack function and defaults if isinstance(func_tuple, tuple): func, defaults = func_tuple else: # Fallback for any old-style entries func, defaults = func_tuple, {} if extracted_val and "[" in str(func): return func(customerid=extracted_val, **defaults) return func(**defaults) except Exception as e: print(f"Execution Error: {e}") return None def similarity(user_query, utterance): """ This function calculates similarity between a user query and an utterance template. It is based on existing algorithms for Levenshtein distance and Monge-Elkan similarity. The function returns a similarity score between 0 and 100. """ # Normalize inputs as requested def normalize(text, keep_brackets=False): text = text.lower() if keep_brackets: text = re.sub(r'[^a-z0-9\s\[\]]', '', text) else: text = re.sub(r'[^a-z0-9\s]', '', text) return re.sub(r'\s+', ' ', text).strip() u_raw = normalize(user_query, keep_brackets=False) t_raw = normalize(utterance, keep_brackets=True) u_len = len(u_raw) t_len = len(t_raw) if u_len == 0 and t_len == 0: return 100.0 if u_len == 0 or t_len == 0: return 0.0 u_tokens = u_raw.split() t_tokens = t_raw.split() # 1. Remove Exact Matches (case-sensitive after normalization) # Use list() to avoid modification during iteration common = [] for token in list(u_tokens): if token in t_tokens: common.append(token) for token in common: if token in u_tokens and token in t_tokens: u_tokens.remove(token) t_tokens.remove(token) # 2. Separate Wildcards from Static Template Words wildcards = [t for t in t_tokens if t.startswith('[') and t.endswith(']')] static_templates = [t for t in t_tokens if t not in wildcards] total_cost = 0 # 3. Match Static Template words to closest User words (Greedy Pairing) # Continue until we run out of static template words OR user words while static_templates and u_tokens: # Find the pair with minimum Levenshtein distance min_cost = float('inf') best_t_idx = -1 best_u_idx = -1 for t_idx, t_word in enumerate(static_templates): for u_idx, u_word in enumerate(u_tokens): cost = Levenshtein.distance(t_word, u_word) if cost < min_cost: min_cost = cost best_t_idx = t_idx best_u_idx = u_idx total_cost += min_cost static_templates.pop(best_t_idx) u_tokens.pop(best_u_idx) # 4. Wildcard Fulfillment # Wildcards consume remaining user tokens at 0 cost while wildcards and u_tokens: wildcards.pop() u_tokens.pop(0) # 5. Penalties for unmatched words # Remaining user words total_cost += sum(len(u) for u in u_tokens) # Remaining static template words (already handled above, but kept for clarity) total_cost += sum(len(t) for t in static_templates) # Remaining wildcards (couldn't be matched) total_cost += sum(len(w) for w in wildcards) # 6. Normalize Score (0-100) score = 100 * (u_len + t_len - total_cost) / (u_len + t_len) return max(0.0, score) # Ensure non-negative def find_best_utterance(user_query, utterances, threshold=90): """ Find the best matching utterance for a user query using MEWF similarity. """ if not utterances: return (None, 0) best_utterance = None best_score = 0 for utterance in utterances: score = similarity(user_query, utterance) if score > best_score: best_score = score best_utterance = utterance if best_score >= threshold: return (best_utterance, best_score) else: return (None, best_score) def tag(user_prompt, config_path="lib/tag.conf"): """ Main functional entry point. Returns (augmented_prompt, all_replaced_boolean) """ # Load config if not already loaded or if a custom path is provided global _CONFIG_LOADED if not _CONFIG_LOADED or config_path != "lib/tag.conf": _load_config(config_path) sentences = _sentence_splitter(user_prompt) untagged_count = len(sentences) final_output = user_prompt templates = list(_REGISTRY.keys()) for sentence in sentences: # Normalize the sentence for matching lsentence = re.sub(r'[^\w\s]', '', sentence.lower()) # Use MEWF similarity to find the best matching utterance best_utterance, score = find_best_utterance(lsentence, templates, threshold=95) if best_utterance: func_tuple = _REGISTRY[best_utterance] extracted_val = None # Extract wildcard values if template contains brackets if "[" in best_utterance: val_match = re.search(r'\b\d{3,}\b', sentence) if val_match: extracted_val = val_match.group(0) result_string = _execute_tool(func_tuple, extracted_val) if result_string is not None: final_output = final_output.replace(sentence, str(result_string) + "\n") untagged_count -= 1 return (final_output, untagged_count == 0) # The shortest path to an answer is one you already know. 🦔