---
title: Valthera
---

사용자가 응답할 가능성이 가장 높을 때 AI 에이전트가 사용자와 상호작용할 수 있도록 합니다.

## Overview

Valthera는 LLM Agent가 더 의미 있는 방식으로 사용자와 상호작용할 수 있도록 하는 오픈 소스 프레임워크입니다. BJ Fogg의 행동 모델(B=MAT)을 기반으로 구축되었으며, 여러 소스(HubSpot, PostHog, Snowflake 등)의 데이터를 활용하여 액션을 트리거하기 전에 사용자의 **동기(motivation)****능력(ability)**을 평가합니다.

이 가이드에서는 다음을 배웁니다:

- **핵심 개념:** 구성 요소(Data Aggregator, Scorer, Reasoning Engine, Trigger Generator)에 대한 개요.
- **시스템 아키텍처:** 시스템을 통해 데이터가 흐르는 방식과 의사 결정이 이루어지는 방식.
- **커스터마이징:** 필요에 맞게 connector, scoring metric, decision rule을 확장하는 방법.

시작해봅시다!

## Setup

이 섹션에서는 의존성 설치와 Valthera를 위한 커스텀 데이터 connector 설정을 다룹니다.

```python
pip install openai langchain langchain_openai valthera langchain_valthera langgraph
from typing import Any, Dict, List

from valthera.connectors.base import BaseConnector


class MockHubSpotConnector(BaseConnector):
    """
    Simulates data retrieval from HubSpot. Provides information such as lead score,
    lifecycle stage, and marketing metrics.
    """

    def get_user_data(self, user_id: str) -> Dict[str, Any]:
        """
        Retrieve mock HubSpot data for a given user.

        Args:
            user_id: The unique identifier for the user

        Returns:
            A dictionary containing HubSpot user data
        """
        return {
            "hubspot_contact_id": "999-ZZZ",
            "lifecycle_stage": "opportunity",
            "lead_status": "engaged",
            "hubspot_lead_score": 100,
            "company_name": "MaxMotivation Corp.",
            "last_contacted_date": "2023-09-20",
            "hubspot_marketing_emails_opened": 20,
            "marketing_emails_clicked": 10,
        }


class MockPostHogConnector(BaseConnector):
    """
    Simulates data retrieval from PostHog. Provides session data and engagement events.
    """

    def get_user_data(self, user_id: str) -> Dict[str, Any]:
        """
        Retrieve mock PostHog data for a given user.

        Args:
            user_id: The unique identifier for the user

        Returns:
            A dictionary containing PostHog user data
        """
        return {
            "distinct_ids": [user_id, f"email_{user_id}"],
            "last_event_timestamp": "2023-09-20T12:34:56Z",
            "feature_flags": ["beta_dashboard", "early_access"],
            "posthog_session_count": 30,
            "avg_session_duration_sec": 400,
            "recent_event_types": ["pageview", "button_click", "premium_feature_used"],
            "posthog_events_count_past_30days": 80,
            "posthog_onboarding_steps_completed": 5,
        }


class MockSnowflakeConnector(BaseConnector):
    """
    Simulates retrieval of additional user profile data from Snowflake.
    """

    def get_user_data(self, user_id: str) -> Dict[str, Any]:
        """
        Retrieve mock Snowflake data for a given user.

        Args:
            user_id: The unique identifier for the user

        Returns:
            A dictionary containing Snowflake user data
        """
        return {
            "user_id": user_id,
            "email": f"{user_id}@example.com",
            "subscription_status": "paid",
            "plan_tier": "premium",
            "account_creation_date": "2023-01-01",
            "preferred_language": "en",
            "last_login_datetime": "2023-09-20T12:00:00Z",
            "behavior_complexity": 3,
        }

Instantiation

이 섹션에서는 핵심 구성 요소를 인스턴스화합니다. 먼저, 커스텀 connector의 데이터를 결합하기 위한 Data Aggregator를 생성합니다. 그런 다음, 동기와 능력에 대한 scoring metric을 구성합니다.
from valthera.aggregator import DataAggregator

# Constants for configuration
LEAD_SCORE_MAX = 100
EVENTS_COUNT_MAX = 50
EMAILS_OPENED_FACTOR = 10.0
SESSION_COUNT_FACTOR_1 = 5.0
ONBOARDING_STEPS_FACTOR = 5.0
SESSION_COUNT_FACTOR_2 = 10.0
BEHAVIOR_COMPLEXITY_MAX = 5.0

# Initialize data aggregator
data_aggregator = DataAggregator(
    connectors={
        "hubspot": MockHubSpotConnector(),
        "posthog": MockPostHogConnector(),
        "snowflake": MockSnowflakeConnector(),
    }
)

# You can now fetch unified user data by calling data_aggregator.get_user_context(user_id)
from typing import Callable, Union

from valthera.scorer import ValtheraScorer


# Define transform functions with proper type annotations
def transform_lead_score(x: Union[int, float]) -> float:
    """Transform lead score to a value between 0 and 1."""
    return min(x, LEAD_SCORE_MAX) / LEAD_SCORE_MAX


def transform_events_count(x: Union[int, float]) -> float:
    """Transform events count to a value between 0 and 1."""
    return min(x, EVENTS_COUNT_MAX) / EVENTS_COUNT_MAX


def transform_emails_opened(x: Union[int, float]) -> float:
    """Transform emails opened to a value between 0 and 1."""
    return min(x / EMAILS_OPENED_FACTOR, 1.0)


def transform_session_count_1(x: Union[int, float]) -> float:
    """Transform session count for motivation to a value between 0 and 1."""
    return min(x / SESSION_COUNT_FACTOR_1, 1.0)


def transform_onboarding_steps(x: Union[int, float]) -> float:
    """Transform onboarding steps to a value between 0 and 1."""
    return min(x / ONBOARDING_STEPS_FACTOR, 1.0)


def transform_session_count_2(x: Union[int, float]) -> float:
    """Transform session count for ability to a value between 0 and 1."""
    return min(x / SESSION_COUNT_FACTOR_2, 1.0)


def transform_behavior_complexity(x: Union[int, float]) -> float:
    """Transform behavior complexity to a value between 0 and 1."""
    return 1 - (min(x, BEHAVIOR_COMPLEXITY_MAX) / BEHAVIOR_COMPLEXITY_MAX)


# Scoring configuration for user motivation
motivation_config = [
    {"key": "hubspot_lead_score", "weight": 0.30, "transform": transform_lead_score},
    {
        "key": "posthog_events_count_past_30days",
        "weight": 0.30,
        "transform": transform_events_count,
    },
    {
        "key": "hubspot_marketing_emails_opened",
        "weight": 0.20,
        "transform": transform_emails_opened,
    },
    {
        "key": "posthog_session_count",
        "weight": 0.20,
        "transform": transform_session_count_1,
    },
]

# Scoring configuration for user ability
ability_config = [
    {
        "key": "posthog_onboarding_steps_completed",
        "weight": 0.30,
        "transform": transform_onboarding_steps,
    },
    {
        "key": "posthog_session_count",
        "weight": 0.30,
        "transform": transform_session_count_2,
    },
    {
        "key": "behavior_complexity",
        "weight": 0.40,
        "transform": transform_behavior_complexity,
    },
]

# Instantiate the scorer
scorer = ValtheraScorer(motivation_config, ability_config)

Invocation

다음으로, Reasoning Engine과 Trigger Generator를 설정한 다음, Valthera Tool을 인스턴스화하여 모든 구성 요소를 통합합니다. 마지막으로, 입력 메시지를 처리하기 위해 에이전트 워크플로우를 실행합니다.
import os

from langchain_openai import ChatOpenAI
from valthera.reasoning_engine import ReasoningEngine

# Define threshold as constant
SCORE_THRESHOLD = 0.75


# Function to safely get API key
def get_openai_api_key() -> str:
    """Get OpenAI API key with error handling."""
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("OPENAI_API_KEY not found in environment variables")
    return api_key


# Decision rules using constant
decision_rules = [
    {
        "condition": f"motivation >= {SCORE_THRESHOLD} and ability >= {SCORE_THRESHOLD}",
        "action": "trigger",
        "description": "Both scores are high enough.",
    },
    {
        "condition": f"motivation < {SCORE_THRESHOLD}",
        "action": "improve_motivation",
        "description": "User motivation is low.",
    },
    {
        "condition": f"ability < {SCORE_THRESHOLD}",
        "action": "improve_ability",
        "description": "User ability is low.",
    },
    {
        "condition": "otherwise",
        "action": "defer",
        "description": "No action needed at this time.",
    },
]

try:
    api_key = get_openai_api_key()
    reasoning_engine = ReasoningEngine(
        llm=ChatOpenAI(
            model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key
        ),
        decision_rules=decision_rules,
    )
except ValueError as e:
    print(f"Error initializing reasoning engine: {e}")
from valthera.trigger_generator import TriggerGenerator

try:
    api_key = get_openai_api_key()  # Reuse the function for consistency
    trigger_generator = TriggerGenerator(
        llm=ChatOpenAI(
            model_name="gpt-4-turbo", temperature=0.7, openai_api_key=api_key
        )
    )
except ValueError as e:
    print(f"Error initializing trigger generator: {e}")
from langchain_valthera.tools import ValtheraTool
from langchain.agents import create_agent


try:
    api_key = get_openai_api_key()

    # Initialize Valthera tool
    valthera_tool = ValtheraTool(
        data_aggregator=data_aggregator,
        motivation_config=motivation_config,
        ability_config=ability_config,
        reasoning_engine=reasoning_engine,
        trigger_generator=trigger_generator,
    )

    # Create agent with LLM
    model = ChatOpenAI(model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key)
    tools = [valthera_tool]
    graph = create_agent(model, tools=tools)

    # Define input message for testing
    inputs = {
        "messages": [("user", "Evaluate behavior for user_12345: Finish Onboarding")]
    }

    # Process the input and display responses
    print("Running Valthera agent workflow...")
    for response in graph.stream(inputs, stream_mode="values"):
        print(response)

except Exception as e:
    print(f"Error running Valthera workflow: {e}")

Chaining

이 통합은 현재 chaining 작업을 지원하지 않습니다. 향후 릴리스에서 chaining 지원이 포함될 수 있습니다.

API reference

다음은 Valthera 통합에서 제공하는 주요 API에 대한 개요입니다:
  • Data Aggregator: data_aggregator.get_user_context(user_id)를 사용하여 집계된 사용자 데이터를 가져옵니다.
  • Scorer: ValtheraScorer는 제공된 구성을 기반으로 동기 및 능력 점수를 계산합니다.
  • Reasoning Engine: ReasoningEngine은 decision rule을 평가하여 적절한 액션(trigger, improve motivation, improve ability, defer)을 결정합니다.
  • Trigger Generator: LLM을 사용하여 개인화된 trigger 메시지를 생성합니다.
  • Valthera Tool: 모든 구성 요소를 통합하여 입력을 처리하고 에이전트 워크플로우를 실행합니다.
자세한 사용법은 소스 코드의 인라인 문서를 참조하세요.

---

<Callout icon="pen-to-square" iconType="regular">
    [Edit the source of this page on GitHub.](https://github.com/langchain-ai/docs/edit/main/src/oss/python/integrations/tools/valthera.mdx)
</Callout>
<Tip icon="terminal" iconType="regular">
    [Connect these docs programmatically](/use-these-docs) to Claude, VSCode, and more via MCP for    real-time answers.
</Tip>
I