Python SDK
Official Klime SDK for Python. Track events, identify users, and associate them with groups.
Installation
pip install klimeQuick Start
from klime import KlimeClient
client = KlimeClient(
write_key='your-write-key'
)
# Identify a user
client.identify('user_123', {
'email': 'user@example.com',
'name': 'Stefan'
})
# Track an event
client.track('Button Clicked', {
'button_name': 'Sign up',
'plan': 'pro'
}, user_id='user_123')
# Associate user with a group and set group traits
client.group('org_456', {
'name': 'Acme Inc',
'plan': 'enterprise'
}, user_id='user_123')
# Or just link the user to a group (if traits are already set)
client.group('org_456', user_id='user_123')
# Shutdown gracefully
client.shutdown()Installation Prompt
Copy and paste this prompt into Cursor, Copilot, or your favorite AI editor to integrate Klime:
Integrate Klime for customer analytics. Klime tracks user activity to identify which customers are healthy vs at risk of churning.
ANALYTICS MODES (determine which applies):
- Companies & Teams: Your customers are companies with multiple team members (SaaS, enterprise tools)
→ Use identify() + group() + track()
- Individual Customers: Your customers are individuals with private accounts (consumer apps, creator tools)
→ Use identify() + track() only (no group() needed)
KEY CONCEPTS:
- Every track() call requires either user_id OR group_id (no anonymous events)
- Use group_id alone for org-level events (webhooks, cron jobs, system metrics)
- group() links a user to a company AND sets company traits (only for Companies & Teams mode)
- Order doesn't matter - events before identify/group still get attributed correctly
BEST PRACTICES:
- Initialize client ONCE at app startup (singleton pattern)
- Store write key in KLIME_WRITE_KEY environment variable
- Call shutdown() on process exit to flush remaining events (auto-registered via atexit)
Install: pip install klime
import os
from klime import KlimeClient
client = KlimeClient(write_key=os.environ['KLIME_WRITE_KEY'])
# Identify users at signup/login:
client.identify('usr_abc123', {'email': 'jane@acme.com', 'name': 'Jane Smith'})
# Track key activities:
client.track('Report Generated', {'report_type': 'revenue'}, user_id='usr_abc123')
client.track('Feature Used', {'feature': 'export', 'format': 'csv'}, user_id='usr_abc123')
client.track('Teammate Invited', {'role': 'member'}, user_id='usr_abc123')
# If Companies & Teams mode: link user to their company and set company traits
client.group('org_456', {'name': 'Acme Inc', 'plan': 'enterprise'}, user_id='usr_abc123')
INTEGRATION WORKFLOW:
Phase 1: Discover
Explore the codebase to understand:
1. What framework is used? (Django, Flask, FastAPI, Starlette, etc.)
2. Where is user identity available? (e.g., request.user.id, current_user.id, g.user, Depends() injection)
3. Is this Companies & Teams or Individual Customers?
- Look for: organization, workspace, tenant, team, account models → Companies & Teams (use group())
- No company/org concept, just individual users → Individual Customers (skip group())
4. Where do core user actions happen? (views, routes, API endpoints, services)
5. Is there existing analytics? (search: segment, posthog, mixpanel, amplitude, track)
Match your integration style to the framework's conventions.
Phase 2: Instrument
Add these calls using idiomatic patterns for the framework:
- Initialize client once (Django: apps.py/settings, Flask: app factory, FastAPI: lifespan/startup)
- identify() in auth/login success handler
- group() when user-org association is established (Companies & Teams mode only)
- track() for key user actions (see below)
WHAT TO TRACK:
Active engagement (primary): feature usage, resource creation, collaboration, completing flows
Session signals (secondary): login/session start, dashboard access - distinguishes "low usage" from "churned"
Do NOT track: every request, health checks, middleware passthrough, background tasks
Phase 3: Verify
Confirm: client initialized, shutdown handled, identify/group/track calls added
Phase 4: Summarize
Report what you added:
- Files modified and what was added to each
- Events being tracked (list event names and what triggers them)
- How user_id is obtained (and group_id if Companies & Teams mode)
- Any assumptions made or questionsAPI Reference
Constructor
KlimeClient(
write_key: str, # Required: Your Klime write key
endpoint: Optional[str] = None, # Optional: API endpoint (default: https://i.klime.com)
flush_interval: Optional[int] = None, # Optional: Milliseconds between flushes (default: 2000)
max_batch_size: Optional[int] = None, # Optional: Max events per batch (default: 20, max: 100)
max_queue_size: Optional[int] = None, # Optional: Max queued events (default: 1000)
retry_max_attempts: Optional[int] = None, # Optional: Max retry attempts (default: 5)
retry_initial_delay: Optional[int] = None, # Optional: Initial retry delay in ms (default: 1000)
flush_on_shutdown: Optional[bool] = None, # Optional: Auto-flush on exit (default: True)
logger: Optional[logging.Logger] = None, # Optional: Custom logger (default: logging.getLogger("klime"))
on_error: Optional[Callable] = None, # Optional: Callback for batch failures
on_success: Optional[Callable] = None # Optional: Callback for successful sends
)Methods
track(event: str, properties: Optional[Dict] = None, user_id: Optional[str] = None, group_id: Optional[str] = None) -> None
Track an event. Events can be attributed in two ways:
- User events: Provide
user_idto track user activity (most common) - Group events: Provide
group_idwithoutuser_idfor organization-level events
# User event (most common)
client.track('Button Clicked', {
'button_name': 'Sign up',
'plan': 'pro'
}, user_id='user_123')
# Group event (for webhooks, cron jobs, system events)
client.track('Events Received', {
'count': 100,
'source': 'webhook'
}, group_id='org_456')Note: The
group_idparameter can also be combined withuser_idfor multi-tenant scenarios where you need to specify which organization context a user event occurred in.
identify(user_id: str, traits: Optional[Dict] = None) -> None
Identify a user with traits.
client.identify('user_123', {
'email': 'user@example.com',
'name': 'Stefan'
})group(group_id: str, traits: Optional[Dict] = None, user_id: Optional[str] = None) -> None
Associate a user with a group and/or set group traits.
# Associate user with a group and set group traits (most common)
client.group('org_456', {
'name': 'Acme Inc',
'plan': 'enterprise'
}, user_id='user_123')
# Just link a user to a group (traits already set or not needed)
client.group('org_456', user_id='user_123')
# Just update group traits (e.g., from a webhook or background job)
client.group('org_456', {
'plan': 'enterprise',
'employee_count': 50
})flush() -> None
Manually flush queued events immediately.
client.flush()shutdown() -> None
Gracefully shutdown the client, flushing remaining events.
client.shutdown()queue_size() -> int
Return the number of events currently in the queue.
pending = client.queue_size()
print(f"{pending} events waiting to be sent")Synchronous Methods
For cases where you need confirmation that events were sent (e.g., before process exit, in tests), use the synchronous variants:
track_sync(event, properties, user_id, group_id) -> BatchResponse
Track an event synchronously. Raises SendError on failure.
from klime import SendError
try:
response = client.track_sync('Critical Action', {'key': 'value'}, user_id='user_123')
print(f"Sent! Accepted: {response.accepted}")
except SendError as e:
print(f"Failed to send: {e.message}")identify_sync(user_id, traits) -> BatchResponse
Identify a user synchronously. Raises SendError on failure.
group_sync(group_id, traits, user_id) -> BatchResponse
Associate a user with a group synchronously. Raises SendError on failure.
Features
- Automatic Batching: Events are automatically batched and sent every 2 seconds or when the batch size reaches 20 events
- Automatic Retries: Failed requests are automatically retried with exponential backoff
- Thread-Safe: Safe to use from multiple threads
- Process Exit Handling: Automatically flushes events on process exit (via
atexit) - Zero Dependencies: Uses only Python standard library
Performance
When you call track(), identify(), or group(), the SDK:
- Adds the event to a thread-safe queue (microseconds)
- Returns immediately without waiting for network I/O
Events are sent to Klime's servers in background threads. This means:
- No network blocking: HTTP requests happen asynchronously in background threads
- No latency impact: Tracking calls add < 1ms to your request handling time
- Automatic batching: Events are queued and sent in batches (default: every 2 seconds or 20 events)
# This returns immediately - no HTTP request is made here
client.track('Button Clicked', {'button': 'signup'}, user_id='user_123')
# Your code continues without waiting
return {'success': True}The only blocking operation is flush(), which waits for all queued events to be sent. This is typically only called during graceful shutdown.
Configuration
Default Values
flush_interval: 2000msmax_batch_size: 20 eventsmax_queue_size: 1000 eventsretry_max_attempts: 5 attemptsretry_initial_delay: 1000msflush_on_shutdown: True
Logging
The SDK uses Python's built-in logging module. By default, it logs to logging.getLogger("klime").
import logging
# Enable debug logging
logging.getLogger("klime").setLevel(logging.DEBUG)
# Or provide a custom logger
client = KlimeClient(
write_key='your-write-key',
logger=logging.getLogger("myapp.klime")
)For Django/Flask, the SDK automatically integrates with your app's logging configuration.
Callbacks
def handle_error(error, events):
# Report to your error tracking service
sentry_sdk.capture_exception(error)
print(f"Failed to send {len(events)} events: {error}")
def handle_success(response):
print(f"Sent {response.accepted} events")
client = KlimeClient(
write_key='your-write-key',
on_error=handle_error,
on_success=handle_success
)Error Handling
The SDK automatically handles:
- Transient errors (429, 503, network failures): Retries with exponential backoff
- Permanent errors (400, 401): Logs error and drops event
- Rate limiting: Respects
Retry-Afterheader
For synchronous operations, use *_sync() methods which raise SendError on failure:
from klime import KlimeClient, SendError
try:
response = client.track_sync('Event', user_id='user_123')
except SendError as e:
print(f"Failed: {e.message}, events: {len(e.events)}")