| | """ |
| | Observability module for Langfuse v3 integration with OpenTelemetry support. |
| | |
| | This module provides: |
| | - Single global CallbackHandler for LangChain integration |
| | - Root span management for user requests |
| | - Session and user tracking |
| | - Background flushing for async operations |
| | """ |
| |
|
| | import os |
| | import base64 |
| | from typing import Optional, Dict, Any |
| | from contextlib import contextmanager |
| | from dotenv import load_dotenv |
| |
|
| | |
| | from langfuse import get_client |
| | from langfuse.langchain import CallbackHandler |
| |
|
| | |
| | from opentelemetry import trace |
| | from opentelemetry.sdk.trace import TracerProvider |
| | from opentelemetry.sdk.trace.export import SimpleSpanProcessor |
| | from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter |
| |
|
| | |
| | load_dotenv("env.local") |
| |
|
| | |
| | _langfuse_handler: Optional[CallbackHandler] = None |
| | _tracer_provider: Optional[TracerProvider] = None |
| |
|
| | def initialize_observability() -> bool: |
| | """ |
| | Initialize Langfuse observability with OTEL integration. |
| | |
| | Returns: |
| | bool: True if initialization successful, False otherwise |
| | """ |
| | global _langfuse_handler, _tracer_provider |
| | |
| | try: |
| | |
| | required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"] |
| | missing_vars = [var for var in required_vars if not os.getenv(var)] |
| | |
| | if missing_vars: |
| | print(f"Warning: Missing required environment variables: {missing_vars}") |
| | return False |
| | |
| | |
| | langfuse_auth = base64.b64encode( |
| | f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode() |
| | ).decode() |
| | |
| | |
| | os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel" |
| | os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}" |
| | |
| | |
| | _tracer_provider = TracerProvider() |
| | _tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) |
| | trace.set_tracer_provider(_tracer_provider) |
| | |
| | |
| | _langfuse_handler = CallbackHandler() |
| | |
| | print("✅ Langfuse observability initialized successfully") |
| | return True |
| | |
| | except Exception as e: |
| | print(f"❌ Failed to initialize observability: {e}") |
| | return False |
| |
|
| | def get_callback_handler() -> Optional[CallbackHandler]: |
| | """ |
| | Get the global Langfuse callback handler. |
| | |
| | Returns: |
| | CallbackHandler or None if not initialized |
| | """ |
| | global _langfuse_handler |
| | |
| | if _langfuse_handler is None: |
| | if initialize_observability(): |
| | return _langfuse_handler |
| | return None |
| | |
| | return _langfuse_handler |
| |
|
| | @contextmanager |
| | def start_root_span( |
| | name: str, |
| | user_id: str, |
| | session_id: str, |
| | metadata: Optional[Dict[str, Any]] = None |
| | ): |
| | """ |
| | Context manager for creating root spans with user and session tracking. |
| | |
| | Args: |
| | name: Span name (e.g., "user-request") |
| | user_id: User identifier for session tracking |
| | session_id: Session identifier for conversation continuity |
| | metadata: Optional additional metadata |
| | |
| | Yields: |
| | Langfuse span context or None if creation fails |
| | """ |
| | span = None |
| | try: |
| | |
| | client = get_client() |
| | span = client.start_as_current_span(name=name) |
| | span_context = span.__enter__() |
| | |
| | |
| | span_context.update_trace( |
| | user_id=user_id, |
| | session_id=session_id, |
| | tags=[ |
| | os.getenv("ENV", "dev"), |
| | "multi-agent-system" |
| | ] |
| | ) |
| | |
| | |
| | if metadata: |
| | span_context.update_trace(metadata=metadata) |
| | |
| | yield span_context |
| | |
| | except Exception as e: |
| | print(f"Warning: Failed to create root span: {e}") |
| | |
| | yield None |
| | finally: |
| | |
| | if span is not None: |
| | try: |
| | span.__exit__(None, None, None) |
| | except Exception as e: |
| | print(f"Warning: Error closing span: {e}") |
| |
|
| | def flush_traces(background: bool = True) -> None: |
| | """ |
| | Flush pending traces to Langfuse. |
| | |
| | Args: |
| | background: Whether to flush in background (non-blocking) |
| | """ |
| | try: |
| | client = get_client() |
| | client.flush() |
| | except Exception as e: |
| | print(f"Warning: Failed to flush traces: {e}") |
| |
|
| | def shutdown_observability() -> None: |
| | """ |
| | Clean shutdown of observability components. |
| | """ |
| | global _tracer_provider |
| | |
| | try: |
| | |
| | flush_traces(background=False) |
| | |
| | |
| | if _tracer_provider: |
| | _tracer_provider.shutdown() |
| | |
| | except Exception as e: |
| | print(f"Warning: Error during observability shutdown: {e}") |
| |
|
| | |
| | @contextmanager |
| | def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None): |
| | """ |
| | Context manager for agent-level spans. |
| | |
| | Args: |
| | agent_name: Name of the agent (e.g., "lead", "research", "code") |
| | metadata: Optional metadata for the span |
| | """ |
| | span_name = f"agent/{agent_name}" |
| | span = None |
| | |
| | try: |
| | client = get_client() |
| | span = client.start_as_current_span(name=span_name) |
| | span_context = span.__enter__() |
| | |
| | if metadata: |
| | span_context.update_trace(metadata=metadata) |
| | yield span_context |
| | except Exception as e: |
| | print(f"Warning: Failed to create agent span for {agent_name}: {e}") |
| | yield None |
| | finally: |
| | if span is not None: |
| | try: |
| | span.__exit__(None, None, None) |
| | except Exception as e: |
| | print(f"Warning: Error closing agent span: {e}") |
| |
|
| | @contextmanager |
| | def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None): |
| | """ |
| | Context manager for tool-level spans. |
| | |
| | Args: |
| | tool_name: Name of the tool (e.g., "tavily_search", "calculator") |
| | metadata: Optional metadata for the span |
| | """ |
| | span_name = f"tool/{tool_name}" |
| | span = None |
| | |
| | try: |
| | client = get_client() |
| | span = client.start_as_current_span(name=span_name) |
| | span_context = span.__enter__() |
| | |
| | if metadata: |
| | span_context.update_trace(metadata=metadata) |
| | yield span_context |
| | except Exception as e: |
| | print(f"Warning: Failed to create tool span for {tool_name}: {e}") |
| | yield None |
| | finally: |
| | if span is not None: |
| | try: |
| | span.__exit__(None, None, None) |
| | except Exception as e: |
| | print(f"Warning: Error closing tool span: {e}") |