Claude API β Python#
Install#
pip install anthropic
Output:
Successfully installed anthropic-0.49.0
Basic message#
import anthropic
client = anthropic.Anthropic() # reads ANTHROPIC_API_KEY from env
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain what a Python decorator is."}]
)
print(response.content[0].text)
print(response.usage)
Output:
A Python decorator is a function that takes another function as input and returns a
modified version of it, allowing you to add behavior before or after the original
function runs without changing its source code. The @syntax is shorthand for
function = decorator(function).
Usage(input_tokens=15, output_tokens=62, cache_creation_input_tokens=0, cache_read_input_tokens=0)
Response object#
print(response.id) # "msg_01XVn..."
print(response.model) # "claude-opus-4-7-20251001"
print(response.stop_reason) # "end_turn" | "max_tokens" | "tool_use" | "stop_sequence"
print(response.usage.input_tokens)
print(response.usage.output_tokens)
# Content blocks
for block in response.content:
print(block.type) # "text"
print(block.text)
Multi-turn conversation#
messages = [
{"role": "user", "content": "What is 2 + 2?"},
{"role": "assistant", "content": "4"},
{"role": "user", "content": "Multiply that by 10."}
]
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=256,
messages=messages
)
print(response.content[0].text)
Output:
40
System prompt#
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
system="You are a concise technical documentation writer. Reply in bullet points only.",
messages=[{"role": "user", "content": "How does TCP handle packet loss?"}]
)
print(response.content[0].text)
Output:
- Sender sets a retransmission timer when a segment is sent
- If ACK not received before timeout, the segment is retransmitted
- Receiver uses sequence numbers to detect duplicates and reorder out-of-order segments
- Duplicate ACKs (3 in a row) trigger fast retransmit before the timer expires
- Congestion window is reduced on loss to slow the send rate
Streaming#
Use client.messages.stream() to receive tokens as they arrive.
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Count to 5 slowly."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
# Get full message after stream completes
message = stream.get_final_message()
print(f"\nTotal tokens: {message.usage.input_tokens + message.usage.output_tokens}")
Output:
One... two... three... four... five.
Total tokens: 31
Low-level streaming events#
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=256,
messages=[{"role": "user", "content": "Hi"}]
) as stream:
for event in stream:
print(event.type)
Output:
message_start
content_block_start
content_block_delta
content_block_delta
content_block_stop
message_delta
message_stop
Vision β image input#
import base64
with open("chart.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_data,
}
},
{"type": "text", "text": "What trend does this chart show?"}
]
}]
)
print(response.content[0].text)
Output:
The chart shows a steady upward trend in monthly active users from Q1 2024 through
Q4 2024, with growth accelerating in Q3. The steepest increase occurs between
August and October.
Image from URL#
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {"type": "url", "url": "https://example.com/diagram.png"}
},
{"type": "text", "text": "Describe this diagram."}
]
}]
)
[!NOTE] Supported image formats:
image/jpeg,image/png,image/gif,image/webp. Max 5 MB per image. Up to 20 images per request.
Extended thinking#
Extended thinking lets Claude reason privately before responding. Useful for multi-step math, logic, and code review.
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{
"role": "user",
"content": "A snail crawls 2 cm forward, then 1 cm back each day. Starting 0 cm, how many days to reach 10 cm?"
}]
)
for block in response.content:
if block.type == "thinking":
print(f"[Thinking: {len(block.thinking)} chars]")
else:
print(block.text)
Output:
[Thinking: 1843 chars]
The snail reaches 10 cm on day 10.
On days 1β9 it ends each day at 1, 2, 3, 4, 5, 6, 7, 8, 9 cm respectively
(net +1 cm/day). On day 10 it crawls forward 2 cm from 9 cm, hitting 11 cm β
which passes 10 cm before the backward crawl, so day 10 is the answer.
[!WARNING] Extended thinking requires
temperature=1(the default).budget_tokensmust be β₯ 1024. Billed for thinking tokens.
Prompt caching#
Cache large, reused context to cut latency and cost by up to 90% on cache hits. TTL is 5 minutes.
large_docs = "... 50,000 tokens of documentation ..."
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
system=[
{"type": "text", "text": "You are a helpful assistant."},
{
"type": "text",
"text": large_docs,
"cache_control": {"type": "ephemeral"}, # mark for caching
}
],
messages=[{"role": "user", "content": "What is the retry policy described in the docs?"}]
)
print(response.usage)
Output (first call β writes cache):
Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=50000, cache_read_input_tokens=0)
Output (subsequent calls within 5 min):
Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=0, cache_read_input_tokens=50000)
[!TIP] Place
cache_controlon the last content block you want included in the cache prefix. Up to 4 cache breakpoints per request. Cache the longest, most stable content (docs, tool definitions, system prompt) rather than volatile per-request content.
Token counting#
Count tokens before sending to avoid hitting limits or to estimate cost.
count = client.messages.count_tokens(
model="claude-opus-4-7",
messages=[{"role": "user", "content": "Explain quantum entanglement in plain English."}]
)
print(count.input_tokens)
Output:
12
# Count tokens including tools and system prompt
count = client.messages.count_tokens(
model="claude-opus-4-7",
system="You are a helpful assistant.",
tools=tools,
messages=messages
)
print(f"Estimated input tokens: {count.input_tokens:,}")
Output:
Estimated input tokens: 3,842
Batch processing#
Process thousands of prompts at 50% cost. Results are ready within 24 hours.
documents = ["Summary of doc 1...", "Summary of doc 2...", "Summary of doc 3..."]
batch = client.messages.batches.create(
requests=[
{
"custom_id": f"doc-{i}",
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 200,
"messages": [{"role": "user", "content": f"Summarize in one sentence: {doc}"}]
}
}
for i, doc in enumerate(documents)
]
)
print(batch.id)
print(batch.processing_status)
Output:
msgbatch_01XVnKzQp...
in_progress
Poll and retrieve results#
import time
# Poll until done
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(60)
# Stream results
for result in client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
print(f"{result.custom_id}: {result.result.message.content[0].text}")
elif result.result.type == "errored":
print(f"{result.custom_id}: ERROR β {result.result.error}")
Output:
doc-0: The document describes a three-tier caching strategy for web services.
doc-1: The document outlines the company's Q3 financial results showing 18% revenue growth.
doc-2: The document explains how to configure database connection pooling in SQLAlchemy.
Stop sequences#
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
stop_sequences=["```", "---"],
messages=[{"role": "user", "content": "Write a Python hello world."}]
)
print(response.stop_reason) # "stop_sequence"
print(response.stop_sequence) # "```"
Error handling#
import anthropic
client = anthropic.Anthropic()
try:
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
)
except anthropic.AuthenticationError:
print("Invalid API key")
except anthropic.RateLimitError as e:
print(f"Rate limited β retry after: {e.response.headers.get('retry-after')}")
except anthropic.APIStatusError as e:
print(f"API error {e.status_code}: {e.message}")
Async client#
import asyncio
import anthropic
async def main():
async with anthropic.AsyncAnthropic() as client:
response = await client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "What is async/await?"}]
)
print(response.content[0].text)
asyncio.run(main())
Output:
Async/await is Python syntax for writing asynchronous code that looks synchronous.
`async def` marks a coroutine function; `await` suspends it until a result is ready
without blocking the event loop, letting other tasks run in the meantime.
Model reference#
| Model | Context | Best for |
|---|---|---|
claude-opus-4-7 | 200K | Complex reasoning, code, analysis |
claude-sonnet-4-6 | 200K | Balanced quality and speed |
claude-haiku-4-5 | 200K | Fast, low-cost, high-volume tasks |
[!TIP] Use
claude-haiku-4-5for batch jobs, classification, and summarization where cost matters. Useclaude-opus-4-7for agentic tasks, code generation, and complex reasoning.claude-sonnet-4-6is a good default for interactive applications.
Environment setup#
# Set API key (preferred β never hardcode)
export ANTHROPIC_API_KEY="sk-ant-..."
# Optional: custom base URL (e.g. proxy)
export ANTHROPIC_BASE_URL="https://my-proxy.example.com"
# Or pass explicitly
client = anthropic.Anthropic(api_key="sk-ant-...")
# Or use a different base URL
client = anthropic.Anthropic(
api_key="sk-ant-...",
base_url="https://my-proxy.example.com"
)