#!/usr/bin/env python3 from prefect import flow, task from prefect.artifacts import create_markdown_artifact import email import subprocess import json import asyncio import litellm # Make sure litellm is installed and configured def get_emails(): """ Execute the notmuch search command to get emails. The command is: notmuch search --format=json tag:inbox and not tag:filed Each line of output is expected to be a JSON object. """ cmd = ["notmuch", "search", "--format=json", "tag:inbox", "and", "not", "tag:filed", "and", "not", "tag:analyzed", "and", "not", "tag:work"] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError("notmuch search command failed: " + result.stderr) emails = [] try: emails = json.loads(result.stdout) except json.JSONDecodeError as e: create_markdown_artifact(key="json-decode-failure", markdown=line) print("Error decoding JSON:", e) return emails @task def fetch_email(query): cmd = ["notmuch", "show", "--format=raw", query] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError("notmuch search command failed: " + result.stderr) email_msg = str(email.message_from_string(result.stdout))[:50000] return email_msg @task async def determine_action(analysis): """ Take an analysis of an email and generate an action """ prompt = ( f"## The following is an spam analysis of an email\n" f"Analysis:\n```\n{analysis}\n```. \n" f"based on this analysis, SIMPLY OUTPUT A SINGLE TOKEN 'SPAM' or 'NOTSPAM' depending on if the analysis determined if the email was spam or not. NO SUPPLEMENTAL TEXT.\n" ) await create_markdown_artifact(key="prompt", markdown=prompt) messages = [{ "content": prompt, "role": "user"}] response = await litellm.acompletion( model="ollama/llama3.2:latest", messages=messages, api_base="http://ollama.jb55.com", stream=False ) action = response.choices[0].message.content.strip() await create_markdown_artifact(key="action", markdown=action) return action @task async def analyze_email(email): """ Analyze an email for spam using your local LLM via litellm.acompletion. This function builds a prompt using the email's subject and authors, then sends it to the local LLM running on http://localhost:11434. """ subject = email.get("subject", "") authors = email.get("authors", "") query = email.get("query")[0].split(" ")[0] content = fetch_email(query) prompt = ( #f"# Determine whether the following email is spam. \n" f"Email: ```\n{content}\n```. \n" f"## Determine whether the email above is spam.\n" f"Provide a brief explanation of your decision.\n" ) await create_markdown_artifact(key="prompt", markdown=prompt) messages = [{ "content": prompt, "role": "user"}] response = await litellm.acompletion( model="ollama/llama3.2:latest", messages=messages, api_base="http://ollama.jb55.com", stream=False ) analysis = response.choices[0].message.content await create_markdown_artifact(key="analysis", markdown=analysis) return analysis @task async def process_action(action, email): query = email.get("query")[0].split(" ")[0] cmd = [] if action == "SPAM": cmd = ["notmuch", "tag", "+spam", "+analyzed", "-inbox", query] elif action == "NOTSPAM": cmd = ["notmuch", "tag", "+analyzed", query] else: raise RuntimeError("unknown action: " + action) cmdstr = " ".join(cmd) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError("notmuch tag command failed: " + result.stderr) report = f"## Query\n\n{query}\n\n## Action\n\n{action}\n\n## Command\n\n{cmdstr}\n\n## Result\n\n{result.stdout}\n" await create_markdown_artifact(key="process", markdown=report) @flow async def analyze_emails(emails): print(f"Found {len(emails)} emails.\n") for email in emails: analysis = await analyze_email(email) action = await determine_action(analysis) await process_action(action, email) if __name__ == "__main__": emails = get_emails() if len(emails) > 0: asyncio.run(analyze_emails(emails)) else: print(f"No emails to analyze. Quitting.")