from prefect import flow, task from prefect.artifacts import create_markdown_artifact from litellm import completion import markdown # Ensure you have installed the 'markdown' package (pip install markdown) @task def fetch_nostr_content() -> str: """ Simulate fetching Nostr threads with hashtag #damus. In production, this would query an API or database. """ content = ( "Thread 1: Loving the new features of Damus! #damus\n" "Thread 2: The user interface of Damus could be improved. #damus\n" "Thread 3: Damus community is very supportive. #damus" ) create_markdown_artifact(key="nostr-content", markdown=content) return content @task def generate_themes_list(text: str) -> str: """ Use LLM to extract a list of themes from the provided text. The prompt instructs the model to return only a list. """ prompt = ( f"Extract a list of themes from this: {text}\n\n" "Do not include any preamble or further explanation." ) response = completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) themes = response.choices[0].message.content.strip() create_markdown_artifact(key="themes-list", markdown=themes) return themes @task def generate_thread_themes(text: str) -> str: """ Use LLM to analyze the threads and highlight common threads and the main reason for using the hashtag #damus. """ prompt = ( f"Tell me the theme and highlight some common threads associated with these Nostr threads that are all #damus. " f"Specifically mention the main reason #damus is hashtagged. These are the threads: {text}" ) response = completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) thread_themes = response.choices[0].message.content.strip() create_markdown_artifact(key="thread-themes", markdown=thread_themes) return thread_themes @task def merge_themes(themes_list: str, thread_themes: str) -> str: """ Merge the themes extracted from the list and thread-level analysis. """ merged = f"**Themes List:**\n{themes_list}\n\n**Thread Themes:**\n{thread_themes}" create_markdown_artifact(key="merged-themes", markdown=merged) return merged @task def generate_themes_and_threads_report(merged_themes: str, content: str) -> str: """ Generate a detailed report using the merged themes and original Nostr content. The prompt is designed to mimic the detailed analysis required by the n8n workflow. """ prompt = ( f"**Task:** Analyze the following Nostr threads containing the hashtag #damus. " f"Provide a detailed report with examples based on the following themes.\n\n" f"## Themes Extracted\n{merged_themes}\n\n" f"## Nostr Threads\n{content}\n\n" "1. **Overall Theme:** Summarize the central topic(s) discussed across the threads.\n" "2. **Common Threads:** Identify recurring topics or ideas that unify the posts.\n" "3. **Key Highlights:** Extract specific examples or quotes that illustrate prominent themes.\n" "4. **Insights and Observations:** Offer insights on how the #damus community engages with the app and its ecosystem.\n" "5. **Suggestions for Improvement:** If applicable, suggest ways to enhance user experience or community engagement.\n\n" "Please use bullet points or numbered lists for clarity and include relevant quotes or examples." ) response = completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) report = response.choices[0].message.content.strip() create_markdown_artifact(key="themes-threads-report", markdown=report) return report @task def convert_markdown_to_html(markdown_text: str) -> str: """ Convert markdown text to HTML. """ html = markdown.markdown(markdown_text) create_markdown_artifact(key="html-report", markdown=html) return html @task def send_email(html_content: str): """ Simulate sending an email with the HTML report. Replace this with your actual email integration (e.g., via Gmail API or smtplib). """ print("=== Sending Email ===") print(html_content) create_markdown_artifact(key="email-content", markdown=html_content) @task def send_telegram_message(html_content: str): """ Simulate sending a Telegram message with the HTML report. Replace this with your actual Telegram API integration. """ print("=== Sending Telegram Message ===") print(html_content) create_markdown_artifact(key="telegram-message", markdown=html_content) @flow def damus_workflow(): """ A Prefect flow that mirrors the n8n workflow by: 1. Fetching Nostr threads with hashtag #damus. 2. Extracting themes and thread-level analysis via LLM. 3. Generating a detailed report. 4. Converting the report to HTML. 5. Sending the report via email and Telegram. """ # Step 1: Fetch Nostr content. content = fetch_nostr_content() # Step 2: Extract themes list. themes_list = generate_themes_list(content) # Step 3: Extract thread themes. thread_themes = generate_thread_themes(content) # Step 4: Merge the themes. merged_themes = merge_themes(themes_list, thread_themes) # Step 5: Generate detailed report. report = generate_themes_and_threads_report(merged_themes, content) # Step 6: Convert the report from markdown to HTML. html_report = convert_markdown_to_html(report) # Step 7: Simulate sending the report via email. send_email(html_report) # Step 8: Simulate sending the report via Telegram. send_telegram_message(html_report) if __name__ == "__main__": damus_workflow()