Skip to main content

Workflow Automation Example

Build intelligent automation workflows that combine multiple AI operations, handle complex logic, and scale with your needs.

Overview

This example covers:
  • Multi-step workflows
  • Conditional logic
  • Error handling
  • Parallel processing
  • Workflow monitoring

Basic Workflow Setup

from meibelai import Meibelai
import os
from datetime import datetime
from typing import Dict, List, Optional

client = Meibelai(
    api_key_header=os.getenv("MEIBELAI_API_KEY_HEADER")
)

# Create workflow datasource
workflow_db = client.datasources.create(
    name="Workflow Automation",
    description="Data for automated workflows"
)

Simple Sequential Workflow

class SimpleWorkflow:
    def __init__(self, datasource_id: str):
        self.datasource_id = datasource_id
        self.steps_completed = []
    
    def run(self, input_data: str) -> Dict:
        try:
            # Step 1: Analyze input
            analysis = self.analyze_input(input_data)
            self.steps_completed.append("analysis")
            
            # Step 2: Process based on analysis
            processed = self.process_data(analysis)
            self.steps_completed.append("processing")
            
            # Step 3: Generate output
            output = self.generate_output(processed)
            self.steps_completed.append("output_generation")
            
            return {
                "status": "completed",
                "output": output,
                "steps": self.steps_completed
            }
            
        except Exception as e:
            return {
                "status": "failed",
                "error": str(e),
                "completed_steps": self.steps_completed
            }
    
    def analyze_input(self, data: str) -> Dict:
        response = client.rag.chat(
            messages=[
                {
                    "role": "system",
                    "content": "Analyze the input and extract key information as JSON"
                },
                {"role": "user", "content": data}
            ],
            execution_control={"response_format": "json"}
        )
        return response.choices[0].message.content
    
    def process_data(self, analysis: Dict) -> str:
        # Process based on analysis results
        return f"Processed: {analysis}"
    
    def generate_output(self, processed: str) -> str:
        response = client.rag.chat(
            messages=[
                {"role": "user", "content": f"Generate final output from: {processed}"}
            ]
        )
        return response.choices[0].message.content

# Run the workflow
workflow = SimpleWorkflow(workflow_db.id)
result = workflow.run("Analyze customer feedback and generate report")
print(result)

Conditional Workflow with Branching

class ConditionalWorkflow:
    def __init__(self, datasource_id: str):
        self.datasource_id = datasource_id
        
    def run(self, document: str) -> Dict:
        # Step 1: Classify document
        doc_type = self.classify_document(document)
        
        # Step 2: Route based on classification
        if doc_type == "invoice":
            return self.process_invoice(document)
        elif doc_type == "contract":
            return self.process_contract(document)
        elif doc_type == "report":
            return self.process_report(document)
        else:
            return self.handle_unknown(document)
    
    def classify_document(self, document: str) -> str:
        response = client.rag.chat(
            messages=[
                {
                    "role": "system",
                    "content": "Classify the document as: invoice, contract, report, or other"
                },
                {"role": "user", "content": document[:500]}  # First 500 chars
            ],
            execution_control={"temperature": 0.1}
        )
        return response.choices[0].message.content.lower().strip()
    
    def process_invoice(self, document: str) -> Dict:
        # Extract invoice details
        extraction = client.rag.chat(
            messages=[
                {
                    "role": "system",
                    "content": "Extract invoice number, date, amount, and items as JSON"
                },
                {"role": "user", "content": document}
            ],
            execution_control={"response_format": "json"}
        )
        
        return {
            "type": "invoice",
            "extracted_data": extraction.choices[0].message.content,
            "next_steps": ["validate", "approve", "process_payment"]
        }
    
    def process_contract(self, document: str) -> Dict:
        # Extract contract key terms
        extraction = client.rag.chat(
            messages=[
                {
                    "role": "system",
                    "content": "Extract parties, terms, dates, and obligations from contract"
                },
                {"role": "user", "content": document}
            ]
        )
        
        # Check for risks
        risk_analysis = client.rag.chat(
            messages=[
                {"role": "user", "content": f"Identify any risks in this contract: {document}"}
            ]
        )
        
        return {
            "type": "contract",
            "key_terms": extraction.choices[0].message.content,
            "risk_analysis": risk_analysis.choices[0].message.content,
            "requires_review": True
        }
    
    def process_report(self, document: str) -> Dict:
        # Generate summary
        summary = client.rag.chat(
            messages=[
                {"role": "user", "content": f"Summarize this report: {document}"}
            ],
            execution_control={"max_tokens": 200}
        )
        
        return {
            "type": "report",
            "summary": summary.choices[0].message.content,
            "action_items": self.extract_action_items(document)
        }
    
    def extract_action_items(self, document: str) -> List[str]:
        response = client.rag.chat(
            messages=[
                {
                    "role": "system",
                    "content": "Extract action items as a JSON array"
                },
                {"role": "user", "content": document}
            ],
            execution_control={"response_format": "json"}
        )
        return response.choices[0].message.content

Parallel Processing Workflow

import asyncio
from meibelai import AsyncMeibelai

class ParallelWorkflow:
    def __init__(self):
        self.client = AsyncMeibelai(
            api_key_header=os.getenv("MEIBELAI_API_KEY_HEADER")
        )
    
    async def process_multiple_documents(self, documents: List[str]) -> List[Dict]:
        # Process all documents in parallel
        tasks = [self.process_single_document(doc) for doc in documents]
        results = await asyncio.gather(*tasks)
        return results
    
    async def process_single_document(self, document: str) -> Dict:
        # Run multiple analyses in parallel
        sentiment_task = self.analyze_sentiment(document)
        summary_task = self.generate_summary(document)
        entities_task = self.extract_entities(document)
        
        sentiment, summary, entities = await asyncio.gather(
            sentiment_task, summary_task, entities_task
        )
        
        return {
            "document": document[:100] + "...",
            "sentiment": sentiment,
            "summary": summary,
            "entities": entities,
            "processed_at": datetime.now().isoformat()
        }
    
    async def analyze_sentiment(self, text: str) -> str:
        response = await self.client.rag.chat(
            messages=[
                {"role": "user", "content": f"Sentiment of: {text}"}
            ],
            execution_control={"max_tokens": 20}
        )
        return response.choices[0].message.content
    
    async def generate_summary(self, text: str) -> str:
        response = await self.client.rag.chat(
            messages=[
                {"role": "user", "content": f"Summarize: {text}"}
            ],
            execution_control={"max_tokens": 100}
        )
        return response.choices[0].message.content
    
    async def extract_entities(self, text: str) -> List[str]:
        response = await self.client.rag.chat(
            messages=[
                {
                    "role": "system",
                    "content": "Extract named entities as JSON array"
                },
                {"role": "user", "content": text}
            ],
            execution_control={"response_format": "json"}
        )
        return response.choices[0].message.content

# Run parallel workflow
async def main():
    workflow = ParallelWorkflow()
    documents = [
        "Apple announced new products today...",
        "The stock market showed gains...",
        "Climate change impacts continue..."
    ]
    results = await workflow.process_multiple_documents(documents)
    print(results)

# asyncio.run(main())

Error Handling and Retry Logic

class RobustWorkflow:
    def __init__(self, datasource_id: str):
        self.datasource_id = datasource_id
        self.max_retries = 3
        
    def run_with_retry(self, operation, *args, **kwargs):
        """Run operation with exponential backoff retry"""
        for attempt in range(self.max_retries):
            try:
                return operation(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise
                wait_time = 2 ** attempt
                print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
                time.sleep(wait_time)
    
    def process_with_fallback(self, data: str) -> Dict:
        try:
            # Try primary processing
            result = self.run_with_retry(self.primary_process, data)
            return {"status": "success", "method": "primary", "result": result}
        except Exception as e:
            print(f"Primary processing failed: {e}")
            try:
                # Fallback to secondary method
                result = self.secondary_process(data)
                return {"status": "success", "method": "secondary", "result": result}
            except Exception as e2:
                # Final fallback
                return {
                    "status": "failed",
                    "errors": [str(e), str(e2)],
                    "fallback_result": self.basic_process(data)
                }
    
    def primary_process(self, data: str) -> str:
        # Complex processing that might fail
        response = client.rag.chat(
            messages=[{"role": "user", "content": f"Complex analysis of: {data}"}],
            execution_control={"timeout": 10}
        )
        return response.choices[0].message.content
    
    def secondary_process(self, data: str) -> str:
        # Simpler backup method
        response = client.rag.chat(
            messages=[{"role": "user", "content": f"Basic analysis of: {data}"}],
            execution_control={"timeout": 20}
        )
        return response.choices[0].message.content
    
    def basic_process(self, data: str) -> str:
        # Most basic processing
        return f"Basic processing completed for: {data[:50]}..."

Workflow Monitoring and Logging

class MonitoredWorkflow:
    def __init__(self, datasource_id: str):
        self.datasource_id = datasource_id
        self.workflow_id = f"workflow_{datetime.now().timestamp()}"
        
    def log_step(self, step_name: str, status: str, details: Dict = None):
        """Log workflow step to datasource"""
        client.dataelements.create(
            datasource_id=self.datasource_id,
            name=f"Workflow Log - {self.workflow_id}",
            content=f"Step: {step_name}, Status: {status}, Details: {details}",
            metadata={
                "workflow_id": self.workflow_id,
                "step": step_name,
                "status": status,
                "timestamp": datetime.now().isoformat()
            }
        )
    
    def run_monitored_workflow(self, input_data: str) -> Dict:
        self.log_step("start", "initiated", {"input": input_data[:100]})
        
        try:
            # Step 1
            self.log_step("validation", "running")
            validation = self.validate_input(input_data)
            self.log_step("validation", "completed", validation)
            
            # Step 2
            self.log_step("processing", "running")
            result = self.process(validation)
            self.log_step("processing", "completed", {"result_size": len(str(result))})
            
            # Step 3
            self.log_step("finalization", "running")
            final = self.finalize(result)
            self.log_step("finalization", "completed")
            
            self.log_step("complete", "success", {"output": final})
            return {"status": "success", "result": final}
            
        except Exception as e:
            self.log_step("error", "failed", {"error": str(e)})
            raise
    
    def get_workflow_history(self) -> List[Dict]:
        """Retrieve workflow execution history"""
        response = client.rag.chat(
            messages=[
                {"role": "user", "content": f"List all steps for workflow {self.workflow_id}"}
            ],
            datasource_ids=[self.datasource_id],
            execution_control={
                "metadata_filter": {"workflow_id": self.workflow_id}
            }
        )
        return response.choices[0].message.content

Best Practices

  1. Modular Design: Break workflows into reusable components
  2. Error Handling: Always implement proper error handling
  3. Monitoring: Log all workflow steps for debugging
  4. Timeouts: Set appropriate timeouts for each step
  5. Testing: Test each workflow component independently
  6. Documentation: Document workflow logic and dependencies

Next Steps