Workflow Automation Example
Build intelligent automation workflows that combine multiple AI operations, handle complex logic, and scale with your needs.Overview
This example covers:- Multi-step workflows
- Conditional logic
- Error handling
- Parallel processing
- Workflow monitoring
Basic Workflow Setup
Copy
Ask AI
from meibelai import Meibelai
import os
from datetime import datetime
from typing import Dict, List, Optional
client = Meibelai(
api_key_header=os.getenv("MEIBELAI_API_KEY_HEADER")
)
# Create workflow datasource
workflow_db = client.datasources.create(
name="Workflow Automation",
description="Data for automated workflows"
)
Simple Sequential Workflow
Copy
Ask AI
class SimpleWorkflow:
def __init__(self, datasource_id: str):
self.datasource_id = datasource_id
self.steps_completed = []
def run(self, input_data: str) -> Dict:
try:
# Step 1: Analyze input
analysis = self.analyze_input(input_data)
self.steps_completed.append("analysis")
# Step 2: Process based on analysis
processed = self.process_data(analysis)
self.steps_completed.append("processing")
# Step 3: Generate output
output = self.generate_output(processed)
self.steps_completed.append("output_generation")
return {
"status": "completed",
"output": output,
"steps": self.steps_completed
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"completed_steps": self.steps_completed
}
def analyze_input(self, data: str) -> Dict:
response = client.rag.chat(
messages=[
{
"role": "system",
"content": "Analyze the input and extract key information as JSON"
},
{"role": "user", "content": data}
],
execution_control={"response_format": "json"}
)
return response.choices[0].message.content
def process_data(self, analysis: Dict) -> str:
# Process based on analysis results
return f"Processed: {analysis}"
def generate_output(self, processed: str) -> str:
response = client.rag.chat(
messages=[
{"role": "user", "content": f"Generate final output from: {processed}"}
]
)
return response.choices[0].message.content
# Run the workflow
workflow = SimpleWorkflow(workflow_db.id)
result = workflow.run("Analyze customer feedback and generate report")
print(result)
Conditional Workflow with Branching
Copy
Ask AI
class ConditionalWorkflow:
def __init__(self, datasource_id: str):
self.datasource_id = datasource_id
def run(self, document: str) -> Dict:
# Step 1: Classify document
doc_type = self.classify_document(document)
# Step 2: Route based on classification
if doc_type == "invoice":
return self.process_invoice(document)
elif doc_type == "contract":
return self.process_contract(document)
elif doc_type == "report":
return self.process_report(document)
else:
return self.handle_unknown(document)
def classify_document(self, document: str) -> str:
response = client.rag.chat(
messages=[
{
"role": "system",
"content": "Classify the document as: invoice, contract, report, or other"
},
{"role": "user", "content": document[:500]} # First 500 chars
],
execution_control={"temperature": 0.1}
)
return response.choices[0].message.content.lower().strip()
def process_invoice(self, document: str) -> Dict:
# Extract invoice details
extraction = client.rag.chat(
messages=[
{
"role": "system",
"content": "Extract invoice number, date, amount, and items as JSON"
},
{"role": "user", "content": document}
],
execution_control={"response_format": "json"}
)
return {
"type": "invoice",
"extracted_data": extraction.choices[0].message.content,
"next_steps": ["validate", "approve", "process_payment"]
}
def process_contract(self, document: str) -> Dict:
# Extract contract key terms
extraction = client.rag.chat(
messages=[
{
"role": "system",
"content": "Extract parties, terms, dates, and obligations from contract"
},
{"role": "user", "content": document}
]
)
# Check for risks
risk_analysis = client.rag.chat(
messages=[
{"role": "user", "content": f"Identify any risks in this contract: {document}"}
]
)
return {
"type": "contract",
"key_terms": extraction.choices[0].message.content,
"risk_analysis": risk_analysis.choices[0].message.content,
"requires_review": True
}
def process_report(self, document: str) -> Dict:
# Generate summary
summary = client.rag.chat(
messages=[
{"role": "user", "content": f"Summarize this report: {document}"}
],
execution_control={"max_tokens": 200}
)
return {
"type": "report",
"summary": summary.choices[0].message.content,
"action_items": self.extract_action_items(document)
}
def extract_action_items(self, document: str) -> List[str]:
response = client.rag.chat(
messages=[
{
"role": "system",
"content": "Extract action items as a JSON array"
},
{"role": "user", "content": document}
],
execution_control={"response_format": "json"}
)
return response.choices[0].message.content
Parallel Processing Workflow
Copy
Ask AI
import asyncio
from meibelai import AsyncMeibelai
class ParallelWorkflow:
def __init__(self):
self.client = AsyncMeibelai(
api_key_header=os.getenv("MEIBELAI_API_KEY_HEADER")
)
async def process_multiple_documents(self, documents: List[str]) -> List[Dict]:
# Process all documents in parallel
tasks = [self.process_single_document(doc) for doc in documents]
results = await asyncio.gather(*tasks)
return results
async def process_single_document(self, document: str) -> Dict:
# Run multiple analyses in parallel
sentiment_task = self.analyze_sentiment(document)
summary_task = self.generate_summary(document)
entities_task = self.extract_entities(document)
sentiment, summary, entities = await asyncio.gather(
sentiment_task, summary_task, entities_task
)
return {
"document": document[:100] + "...",
"sentiment": sentiment,
"summary": summary,
"entities": entities,
"processed_at": datetime.now().isoformat()
}
async def analyze_sentiment(self, text: str) -> str:
response = await self.client.rag.chat(
messages=[
{"role": "user", "content": f"Sentiment of: {text}"}
],
execution_control={"max_tokens": 20}
)
return response.choices[0].message.content
async def generate_summary(self, text: str) -> str:
response = await self.client.rag.chat(
messages=[
{"role": "user", "content": f"Summarize: {text}"}
],
execution_control={"max_tokens": 100}
)
return response.choices[0].message.content
async def extract_entities(self, text: str) -> List[str]:
response = await self.client.rag.chat(
messages=[
{
"role": "system",
"content": "Extract named entities as JSON array"
},
{"role": "user", "content": text}
],
execution_control={"response_format": "json"}
)
return response.choices[0].message.content
# Run parallel workflow
async def main():
workflow = ParallelWorkflow()
documents = [
"Apple announced new products today...",
"The stock market showed gains...",
"Climate change impacts continue..."
]
results = await workflow.process_multiple_documents(documents)
print(results)
# asyncio.run(main())
Error Handling and Retry Logic
Copy
Ask AI
class RobustWorkflow:
def __init__(self, datasource_id: str):
self.datasource_id = datasource_id
self.max_retries = 3
def run_with_retry(self, operation, *args, **kwargs):
"""Run operation with exponential backoff retry"""
for attempt in range(self.max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
def process_with_fallback(self, data: str) -> Dict:
try:
# Try primary processing
result = self.run_with_retry(self.primary_process, data)
return {"status": "success", "method": "primary", "result": result}
except Exception as e:
print(f"Primary processing failed: {e}")
try:
# Fallback to secondary method
result = self.secondary_process(data)
return {"status": "success", "method": "secondary", "result": result}
except Exception as e2:
# Final fallback
return {
"status": "failed",
"errors": [str(e), str(e2)],
"fallback_result": self.basic_process(data)
}
def primary_process(self, data: str) -> str:
# Complex processing that might fail
response = client.rag.chat(
messages=[{"role": "user", "content": f"Complex analysis of: {data}"}],
execution_control={"timeout": 10}
)
return response.choices[0].message.content
def secondary_process(self, data: str) -> str:
# Simpler backup method
response = client.rag.chat(
messages=[{"role": "user", "content": f"Basic analysis of: {data}"}],
execution_control={"timeout": 20}
)
return response.choices[0].message.content
def basic_process(self, data: str) -> str:
# Most basic processing
return f"Basic processing completed for: {data[:50]}..."
Workflow Monitoring and Logging
Copy
Ask AI
class MonitoredWorkflow:
def __init__(self, datasource_id: str):
self.datasource_id = datasource_id
self.workflow_id = f"workflow_{datetime.now().timestamp()}"
def log_step(self, step_name: str, status: str, details: Dict = None):
"""Log workflow step to datasource"""
client.dataelements.create(
datasource_id=self.datasource_id,
name=f"Workflow Log - {self.workflow_id}",
content=f"Step: {step_name}, Status: {status}, Details: {details}",
metadata={
"workflow_id": self.workflow_id,
"step": step_name,
"status": status,
"timestamp": datetime.now().isoformat()
}
)
def run_monitored_workflow(self, input_data: str) -> Dict:
self.log_step("start", "initiated", {"input": input_data[:100]})
try:
# Step 1
self.log_step("validation", "running")
validation = self.validate_input(input_data)
self.log_step("validation", "completed", validation)
# Step 2
self.log_step("processing", "running")
result = self.process(validation)
self.log_step("processing", "completed", {"result_size": len(str(result))})
# Step 3
self.log_step("finalization", "running")
final = self.finalize(result)
self.log_step("finalization", "completed")
self.log_step("complete", "success", {"output": final})
return {"status": "success", "result": final}
except Exception as e:
self.log_step("error", "failed", {"error": str(e)})
raise
def get_workflow_history(self) -> List[Dict]:
"""Retrieve workflow execution history"""
response = client.rag.chat(
messages=[
{"role": "user", "content": f"List all steps for workflow {self.workflow_id}"}
],
datasource_ids=[self.datasource_id],
execution_control={
"metadata_filter": {"workflow_id": self.workflow_id}
}
)
return response.choices[0].message.content
Best Practices
- Modular Design: Break workflows into reusable components
- Error Handling: Always implement proper error handling
- Monitoring: Log all workflow steps for debugging
- Timeouts: Set appropriate timeouts for each step
- Testing: Test each workflow component independently
- Documentation: Document workflow logic and dependencies