skip to content

Claude API β€” Python

Complete Python SDK reference for the Anthropic Claude API β€” messages, streaming, vision, extended thinking, prompt caching, batch processing, and token counting.

6 min read 35 snippets yesterday intermediate

Claude API β€” Python#

Install#

pip install anthropic

Output:

Successfully installed anthropic-0.49.0

Basic message#

import anthropic

client = anthropic.Anthropic()  # reads ANTHROPIC_API_KEY from env

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain what a Python decorator is."}]
)

print(response.content[0].text)
print(response.usage)

Output:

A Python decorator is a function that takes another function as input and returns a
modified version of it, allowing you to add behavior before or after the original
function runs without changing its source code. The @syntax is shorthand for
function = decorator(function).

Usage(input_tokens=15, output_tokens=62, cache_creation_input_tokens=0, cache_read_input_tokens=0)

Response object#

print(response.id)            # "msg_01XVn..."
print(response.model)         # "claude-opus-4-7-20251001"
print(response.stop_reason)   # "end_turn" | "max_tokens" | "tool_use" | "stop_sequence"
print(response.usage.input_tokens)
print(response.usage.output_tokens)

# Content blocks
for block in response.content:
    print(block.type)    # "text"
    print(block.text)

Multi-turn conversation#

messages = [
    {"role": "user", "content": "What is 2 + 2?"},
    {"role": "assistant", "content": "4"},
    {"role": "user", "content": "Multiply that by 10."}
]

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=256,
    messages=messages
)
print(response.content[0].text)

Output:

40

System prompt#

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    system="You are a concise technical documentation writer. Reply in bullet points only.",
    messages=[{"role": "user", "content": "How does TCP handle packet loss?"}]
)
print(response.content[0].text)

Output:

- Sender sets a retransmission timer when a segment is sent
- If ACK not received before timeout, the segment is retransmitted
- Receiver uses sequence numbers to detect duplicates and reorder out-of-order segments
- Duplicate ACKs (3 in a row) trigger fast retransmit before the timer expires
- Congestion window is reduced on loss to slow the send rate

Streaming#

Use client.messages.stream() to receive tokens as they arrive.

with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Count to 5 slowly."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()

# Get full message after stream completes
message = stream.get_final_message()
print(f"\nTotal tokens: {message.usage.input_tokens + message.usage.output_tokens}")

Output:

One... two... three... four... five.

Total tokens: 31

Low-level streaming events#

with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=256,
    messages=[{"role": "user", "content": "Hi"}]
) as stream:
    for event in stream:
        print(event.type)

Output:

message_start
content_block_start
content_block_delta
content_block_delta
content_block_stop
message_delta
message_stop

Vision β€” image input#

import base64

with open("chart.png", "rb") as f:
    image_data = base64.standard_b64encode(f.read()).decode("utf-8")

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": "image/png",
                    "data": image_data,
                }
            },
            {"type": "text", "text": "What trend does this chart show?"}
        ]
    }]
)
print(response.content[0].text)

Output:

The chart shows a steady upward trend in monthly active users from Q1 2024 through
Q4 2024, with growth accelerating in Q3. The steepest increase occurs between
August and October.

Image from URL#

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "image",
                "source": {"type": "url", "url": "https://example.com/diagram.png"}
            },
            {"type": "text", "text": "Describe this diagram."}
        ]
    }]
)

[!NOTE] Supported image formats: image/jpeg, image/png, image/gif, image/webp. Max 5 MB per image. Up to 20 images per request.

Extended thinking#

Extended thinking lets Claude reason privately before responding. Useful for multi-step math, logic, and code review.

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{
        "role": "user",
        "content": "A snail crawls 2 cm forward, then 1 cm back each day. Starting 0 cm, how many days to reach 10 cm?"
    }]
)

for block in response.content:
    if block.type == "thinking":
        print(f"[Thinking: {len(block.thinking)} chars]")
    else:
        print(block.text)

Output:

[Thinking: 1843 chars]
The snail reaches 10 cm on day 10.

On days 1–9 it ends each day at 1, 2, 3, 4, 5, 6, 7, 8, 9 cm respectively
(net +1 cm/day). On day 10 it crawls forward 2 cm from 9 cm, hitting 11 cm β€”
which passes 10 cm before the backward crawl, so day 10 is the answer.

[!WARNING] Extended thinking requires temperature=1 (the default). budget_tokens must be β‰₯ 1024. Billed for thinking tokens.

Prompt caching#

Cache large, reused context to cut latency and cost by up to 90% on cache hits. TTL is 5 minutes.

large_docs = "... 50,000 tokens of documentation ..."

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=1024,
    system=[
        {"type": "text", "text": "You are a helpful assistant."},
        {
            "type": "text",
            "text": large_docs,
            "cache_control": {"type": "ephemeral"},   # mark for caching
        }
    ],
    messages=[{"role": "user", "content": "What is the retry policy described in the docs?"}]
)
print(response.usage)

Output (first call β€” writes cache):

Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=50000, cache_read_input_tokens=0)

Output (subsequent calls within 5 min):

Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=0, cache_read_input_tokens=50000)

[!TIP] Place cache_control on the last content block you want included in the cache prefix. Up to 4 cache breakpoints per request. Cache the longest, most stable content (docs, tool definitions, system prompt) rather than volatile per-request content.

Token counting#

Count tokens before sending to avoid hitting limits or to estimate cost.

count = client.messages.count_tokens(
    model="claude-opus-4-7",
    messages=[{"role": "user", "content": "Explain quantum entanglement in plain English."}]
)
print(count.input_tokens)

Output:

12
# Count tokens including tools and system prompt
count = client.messages.count_tokens(
    model="claude-opus-4-7",
    system="You are a helpful assistant.",
    tools=tools,
    messages=messages
)
print(f"Estimated input tokens: {count.input_tokens:,}")

Output:

Estimated input tokens: 3,842

Batch processing#

Process thousands of prompts at 50% cost. Results are ready within 24 hours.

documents = ["Summary of doc 1...", "Summary of doc 2...", "Summary of doc 3..."]

batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": f"doc-{i}",
            "params": {
                "model": "claude-haiku-4-5",
                "max_tokens": 200,
                "messages": [{"role": "user", "content": f"Summarize in one sentence: {doc}"}]
            }
        }
        for i, doc in enumerate(documents)
    ]
)

print(batch.id)
print(batch.processing_status)

Output:

msgbatch_01XVnKzQp...
in_progress

Poll and retrieve results#

import time

# Poll until done
while True:
    batch = client.messages.batches.retrieve(batch.id)
    if batch.processing_status == "ended":
        break
    time.sleep(60)

# Stream results
for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        print(f"{result.custom_id}: {result.result.message.content[0].text}")
    elif result.result.type == "errored":
        print(f"{result.custom_id}: ERROR β€” {result.result.error}")

Output:

doc-0: The document describes a three-tier caching strategy for web services.
doc-1: The document outlines the company's Q3 financial results showing 18% revenue growth.
doc-2: The document explains how to configure database connection pooling in SQLAlchemy.

Stop sequences#

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    stop_sequences=["```", "---"],
    messages=[{"role": "user", "content": "Write a Python hello world."}]
)
print(response.stop_reason)   # "stop_sequence"
print(response.stop_sequence) # "```"

Error handling#

import anthropic

client = anthropic.Anthropic()

try:
    response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}]
    )
except anthropic.AuthenticationError:
    print("Invalid API key")
except anthropic.RateLimitError as e:
    print(f"Rate limited β€” retry after: {e.response.headers.get('retry-after')}")
except anthropic.APIStatusError as e:
    print(f"API error {e.status_code}: {e.message}")

Async client#

import asyncio
import anthropic

async def main():
    async with anthropic.AsyncAnthropic() as client:
        response = await client.messages.create(
            model="claude-opus-4-7",
            max_tokens=1024,
            messages=[{"role": "user", "content": "What is async/await?"}]
        )
        print(response.content[0].text)

asyncio.run(main())

Output:

Async/await is Python syntax for writing asynchronous code that looks synchronous.
`async def` marks a coroutine function; `await` suspends it until a result is ready
without blocking the event loop, letting other tasks run in the meantime.

Model reference#

ModelContextBest for
claude-opus-4-7200KComplex reasoning, code, analysis
claude-sonnet-4-6200KBalanced quality and speed
claude-haiku-4-5200KFast, low-cost, high-volume tasks

[!TIP] Use claude-haiku-4-5 for batch jobs, classification, and summarization where cost matters. Use claude-opus-4-7 for agentic tasks, code generation, and complex reasoning. claude-sonnet-4-6 is a good default for interactive applications.

Environment setup#

# Set API key (preferred β€” never hardcode)
export ANTHROPIC_API_KEY="sk-ant-..."

# Optional: custom base URL (e.g. proxy)
export ANTHROPIC_BASE_URL="https://my-proxy.example.com"
# Or pass explicitly
client = anthropic.Anthropic(api_key="sk-ant-...")

# Or use a different base URL
client = anthropic.Anthropic(
    api_key="sk-ant-...",
    base_url="https://my-proxy.example.com"
)