Apache Cassandra®는 트랜잭션 애플리케이션 데이터를 저장하는 데 널리 사용되는 데이터베이스입니다. Large Language Model에 함수와 도구가 도입되면서 생성형 AI 애플리케이션에서 기존 데이터를 활용할 수 있는 흥미로운 사용 사례들이 열렸습니다.
Cassandra Database toolkit은 AI 엔지니어가 Cassandra 데이터와 agent를 효율적으로 통합할 수 있도록 하며, 다음과 같은 기능을 제공합니다:
  • 최적화된 쿼리를 통한 빠른 데이터 액세스. 대부분의 쿼리는 한 자릿수 ms 이하로 실행됩니다.
  • LLM 추론 능력을 향상시키는 스키마 introspection
  • Apache Cassandra®, DataStax Enterprise™, DataStax Astra™를 포함한 다양한 Cassandra 배포와의 호환성
  • 현재 toolkit은 SELECT 쿼리와 스키마 introspection 작업으로 제한됩니다. (안전 우선)
Cassandra DB agent 생성에 대한 자세한 내용은 CQL agent cookbook을 참조하세요

Quick Start

  • cassio 라이브러리 설치
  • 연결할 Cassandra 데이터베이스에 대한 환경 변수 설정
  • CassandraDatabase 초기화
  • toolkit.get_tools()로 agent에 도구 전달
  • 편히 앉아서 모든 작업이 자동으로 수행되는 것을 지켜보세요

Theory of Operation

Cassandra Query Language (CQL)는 Cassandra 데이터베이스와 상호작용하는 주요 인간 중심 방법입니다. 쿼리 생성 시 어느 정도의 유연성을 제공하지만, Cassandra 데이터 모델링 모범 사례에 대한 지식이 필요합니다. LLM function calling은 agent에게 추론한 후 요청을 충족할 도구를 선택할 수 있는 능력을 제공합니다. LLM을 사용하는 Agent는 적절한 toolkit 또는 toolkit 체인을 선택할 때 Cassandra 특정 로직을 사용하여 추론해야 합니다. 이는 LLM이 하향식 솔루션을 제공하도록 강요될 때 발생하는 무작위성을 줄입니다. LLM이 데이터베이스에 완전히 제한 없이 액세스하도록 하시겠습니까? 네. 아마 아닐 겁니다. 이를 달성하기 위해 agent에 대한 질문을 구성할 때 사용할 prompt를 제공합니다: You are an Apache Cassandra expert query analysis bot with the following features and rules:
  • You will take a question from the end user about finding specific data in the database.
  • You will examine the schema of the database and create a query path.
  • You will provide the user with the correct query to find the data they are looking for, showing the steps provided by the query path.
  • You will use best practices for querying Apache Cassandra using partition keys and clustering columns.
  • Avoid using ALLOW FILTERING in the query.
  • The goal is to find a query path, so it may take querying other tables to get to the final answer.
The following is an example of a query path in JSON format:
 {
  "query_paths": [
    {
      "description": "Direct query to users table using email",
      "steps": [
        {
          "table": "user_credentials",
          "query":
             "SELECT userid FROM user_credentials WHERE email = '[email protected]';"
        },
        {
          "table": "users",
          "query": "SELECT * FROM users WHERE userid = ?;"
        }
      ]
    }
  ]
}

Tools Provided

cassandra_db_schema

연결된 데이터베이스 또는 특정 스키마에 대한 모든 스키마 정보를 수집합니다. Agent가 작업을 결정할 때 중요합니다.

cassandra_db_select_table_data

특정 keyspace와 table에서 데이터를 선택합니다. Agent는 predicate와 반환되는 레코드 수 제한에 대한 매개변수를 전달할 수 있습니다.

cassandra_db_query

매개변수 대신 agent가 완전히 구성한 쿼리 문자열을 받는 cassandra_db_select_table_data의 실험적 대안입니다. 경고: 이는 성능이 떨어지거나(또는 작동하지 않을 수도 있는) 비정상적인 쿼리로 이어질 수 있습니다. 향후 릴리스에서 제거될 수 있습니다. 멋진 작업을 수행한다면 그것도 알고 싶습니다. 알 수 없는 일이죠!

Environment Setup

다음 Python 모듈을 설치하세요:
pip install ipykernel python-dotenv cassio langchain-openai langchain langchain-community langchainhub

.env file

연결은 auto=True 매개변수를 사용하는 cassio를 통해 이루어지며, notebook은 OpenAI를 사용합니다. 이에 따라 .env 파일을 생성해야 합니다. Cassandra의 경우 다음을 설정하세요:
CASSANDRA_CONTACT_POINTS
CASSANDRA_USERNAME
CASSANDRA_PASSWORD
CASSANDRA_KEYSPACE
Astra의 경우 다음을 설정하세요:
ASTRA_DB_APPLICATION_TOKEN
ASTRA_DB_DATABASE_ID
ASTRA_DB_KEYSPACE
예시:
# Connection to Astra:
ASTRA_DB_DATABASE_ID=a1b2c3d4-...
ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
ASTRA_DB_KEYSPACE=notebooks

# Also set
OPENAI_API_KEY=sk-....
(아래 코드를 수정하여 cassio로 직접 연결할 수도 있습니다.)
from dotenv import load_dotenv

load_dotenv(override=True)
# Import necessary libraries
import os

import cassio
from langchain_classic import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.agent_toolkits.cassandra_database.toolkit import (
    CassandraDatabaseToolkit,
)
from langchain_community.tools.cassandra_database.prompt import QUERY_PATH_PROMPT
from langchain_community.utilities.cassandra_database import CassandraDatabase
from langchain_openai import ChatOpenAI

Connect to a Cassandra Database

cassio.init(auto=True)
session = cassio.config.resolve_session()
if not session:
    raise Exception(
        "Check environment configuration or manually configure cassio connection parameters"
    )
# Test data pep

session = cassio.config.resolve_session()

session.execute("""DROP KEYSPACE IF EXISTS langchain_agent_test; """)

session.execute(
    """
CREATE KEYSPACE if not exists langchain_agent_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
"""
)

session.execute(
    """
    CREATE TABLE IF NOT EXISTS langchain_agent_test.user_credentials (
    user_email text PRIMARY KEY,
    user_id UUID,
    password TEXT
);
"""
)

session.execute(
    """
    CREATE TABLE IF NOT EXISTS langchain_agent_test.users (
    id UUID PRIMARY KEY,
    name TEXT,
    email TEXT
);"""
)

session.execute(
    """
    CREATE TABLE IF NOT EXISTS langchain_agent_test.user_videos (
    user_id UUID,
    video_id UUID,
    title TEXT,
    description TEXT,
    PRIMARY KEY (user_id, video_id)
);
"""
)

user_id = "522b1fe2-2e36-4cef-a667-cd4237d08b89"
video_id = "27066014-bad7-9f58-5a30-f63fe03718f6"

session.execute(
    f"""
    INSERT INTO langchain_agent_test.user_credentials (user_id, user_email)
    VALUES ({user_id}, '[email protected]');
"""
)

session.execute(
    f"""
    INSERT INTO langchain_agent_test.users (id, name, email)
    VALUES ({user_id}, 'Patrick McFadin', '[email protected]');
"""
)

session.execute(
    f"""
    INSERT INTO langchain_agent_test.user_videos (user_id, video_id, title)
    VALUES ({user_id}, {video_id}, 'Use Langflow to Build a LangChain LLM Application in 5 Minutes');
"""
)

session.set_keyspace("langchain_agent_test")
# Create a CassandraDatabase instance
# Uses the cassio session to connect to the database
db = CassandraDatabase()
# Choose the LLM that will drive the agent
# Only certain models support this
llm = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
toolkit = CassandraDatabaseToolkit(db=db)

tools = toolkit.get_tools()

print("Available tools:")
for tool in tools:
    print(tool.name + "\t- " + tool.description)
Available tools:
cassandra_db_schema -
    Input to this tool is a keyspace name, output is a table description
    of Apache Cassandra tables.
    If the query is not correct, an error message will be returned.
    If an error is returned, report back to the user that the keyspace
    doesn't exist and stop.

cassandra_db_query -
    Execute a CQL query against the database and get back the result.
    If the query is not correct, an error message will be returned.
    If an error is returned, rewrite the query, check the query, and try again.

cassandra_db_select_table_data -
    Tool for getting data from a table in an Apache Cassandra database.
    Use the WHERE clause to specify the predicate for the query that uses the
    primary key. A blank predicate will return all rows. Avoid this if possible.
    Use the limit to specify the number of rows to return. A blank limit will
    return all rows.
prompt = hub.pull("hwchase17/openai-tools-agent")

# Construct the OpenAI Tools agent
agent = create_openai_tools_agent(llm, tools, prompt)
input = (
    QUERY_PATH_PROMPT
    + "\n\nHere is your task: Find all the videos that the user with the email address '[email protected]' has uploaded to the langchain_agent_test keyspace."
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

response = agent_executor.invoke({"input": input})

print(response["output"])
> Entering new AgentExecutor chain...

Invoking: `cassandra_db_schema` with `{'keyspace': 'langchain_agent_test'}`


Table Name: user_credentials
- Keyspace: langchain_agent_test
- Columns
  - password (text)
  - user_email (text)
  - user_id (uuid)
- Partition Keys: (user_email)
- Clustering Keys:

Table Name: user_videos
- Keyspace: langchain_agent_test
- Columns
  - description (text)
  - title (text)
  - user_id (uuid)
  - video_id (uuid)
- Partition Keys: (user_id)
- Clustering Keys: (video_id asc)


Table Name: users
- Keyspace: langchain_agent_test
- Columns
  - email (text)
  - id (uuid)
  - name (text)
- Partition Keys: (id)
- Clustering Keys:


Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_credentials', 'predicate': "user_email = '[email protected]'", 'limit': 1}`


Row(user_email='[email protected]', password=None, user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'))
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_videos', 'predicate': 'user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89', 'limit': 10}`


Row(user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'), video_id=UUID('27066014-bad7-9f58-5a30-f63fe03718f6'), description='DataStax Academy is a free resource for learning Apache Cassandra.', title='DataStax Academy')To find all the videos that the user with the email address '[email protected]' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:

1. Query the `user_credentials` table to find the `user_id` associated with the email '[email protected]'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.

Here is the query path in JSON format:

\`\`\`json
{
  "query_paths": [
    {
      "description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
      "steps": [
        {
          "table": "user_credentials",
          "query": "SELECT user_id FROM user_credentials WHERE user_email = '[email protected]';"
        },
        {
          "table": "user_videos",
          "query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
        }
      ]
    }
  ]
}
\`\`\`

Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.

> Finished chain.
To find all the videos that the user with the email address '[email protected]' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:

1. Query the `user_credentials` table to find the `user_id` associated with the email '[email protected]'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.

Here is the query path in JSON format:

\`\`\`json
{
  "query_paths": [
    {
      "description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
      "steps": [
        {
          "table": "user_credentials",
          "query": "SELECT user_id FROM user_credentials WHERE user_email = '[email protected]';"
        },
        {
          "table": "user_videos",
          "query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
        }
      ]
    }
  ]
}
\`\`\`

Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I