Python

Python SDK

Official Klime SDK for Python. Track events, identify users, and associate them with groups.

Installation

pip install klime

Quick Start

from klime import KlimeClient

client = KlimeClient(
    write_key='your-write-key'
)

# Identify a user
client.identify('user_123', {
    'email': 'user@example.com',
    'name': 'Stefan'
})

# Track an event
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Associate user with a group and set group traits
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Or just link the user to a group (if traits are already set)
client.group('org_456', user_id='user_123')

# Shutdown gracefully
client.shutdown()

Installation Prompt

Copy and paste this prompt into Cursor, Copilot, or your favorite AI editor to integrate Klime:

Integrate Klime for customer analytics. Klime tracks user activity to identify which customers are healthy vs at risk of churning.

ANALYTICS MODES (determine which applies):
- Companies & Teams: Your customers are companies with multiple team members (SaaS, enterprise tools)
  → Use identify() + group() + track()
- Individual Customers: Your customers are individuals with private accounts (consumer apps, creator tools)
  → Use identify() + track() only (no group() needed)

KEY CONCEPTS:
- Every track() call requires either user_id OR group_id (no anonymous events)
- Use group_id alone for org-level events (webhooks, cron jobs, system metrics)
- group() links a user to a company AND sets company traits (only for Companies & Teams mode)
- Order doesn't matter - events before identify/group still get attributed correctly

BEST PRACTICES:
- Initialize client ONCE at app startup (singleton pattern)
- Store write key in KLIME_WRITE_KEY environment variable
- Call shutdown() on process exit to flush remaining events (auto-registered via atexit)

Install: pip install klime

import os
from klime import KlimeClient

client = KlimeClient(write_key=os.environ['KLIME_WRITE_KEY'])

# Identify users at signup/login:
client.identify('usr_abc123', {'email': 'jane@acme.com', 'name': 'Jane Smith'})

# Track key activities:
client.track('Report Generated', {'report_type': 'revenue'}, user_id='usr_abc123')
client.track('Feature Used', {'feature': 'export', 'format': 'csv'}, user_id='usr_abc123')
client.track('Teammate Invited', {'role': 'member'}, user_id='usr_abc123')

# If Companies & Teams mode: link user to their company and set company traits
client.group('org_456', {'name': 'Acme Inc', 'plan': 'enterprise'}, user_id='usr_abc123')

INTEGRATION WORKFLOW:

Phase 1: Discover
Explore the codebase to understand:
1. What framework is used? (Django, Flask, FastAPI, Starlette, etc.)
2. Where is user identity available? (e.g., request.user.id, current_user.id, g.user, Depends() injection)
3. Is this Companies & Teams or Individual Customers?
   - Look for: organization, workspace, tenant, team, account models → Companies & Teams (use group())
   - No company/org concept, just individual users → Individual Customers (skip group())
4. Where do core user actions happen? (views, routes, API endpoints, services)
5. Is there existing analytics? (search: segment, posthog, mixpanel, amplitude, track)
Match your integration style to the framework's conventions.

Phase 2: Instrument
Add these calls using idiomatic patterns for the framework:
- Initialize client once (Django: apps.py/settings, Flask: app factory, FastAPI: lifespan/startup)
- identify() in auth/login success handler
- group() when user-org association is established (Companies & Teams mode only)
- track() for key user actions (see below)

WHAT TO TRACK:
Active engagement (primary): feature usage, resource creation, collaboration, completing flows
Session signals (secondary): login/session start, dashboard access - distinguishes "low usage" from "churned"
Do NOT track: every request, health checks, middleware passthrough, background tasks

Phase 3: Verify
Confirm: client initialized, shutdown handled, identify/group/track calls added

Phase 4: Summarize
Report what you added:
- Files modified and what was added to each
- Events being tracked (list event names and what triggers them)
- How user_id is obtained (and group_id if Companies & Teams mode)
- Any assumptions made or questions

API Reference

Constructor

KlimeClient(
    write_key: str,                    # Required: Your Klime write key
    endpoint: Optional[str] = None,    # Optional: API endpoint (default: https://i.klime.com)
    flush_interval: Optional[int] = None,      # Optional: Milliseconds between flushes (default: 2000)
    max_batch_size: Optional[int] = None,     # Optional: Max events per batch (default: 20, max: 100)
    max_queue_size: Optional[int] = None,      # Optional: Max queued events (default: 1000)
    retry_max_attempts: Optional[int] = None, # Optional: Max retry attempts (default: 5)
    retry_initial_delay: Optional[int] = None, # Optional: Initial retry delay in ms (default: 1000)
    flush_on_shutdown: Optional[bool] = None,  # Optional: Auto-flush on exit (default: True)
    logger: Optional[logging.Logger] = None,   # Optional: Custom logger (default: logging.getLogger("klime"))
    on_error: Optional[Callable] = None,       # Optional: Callback for batch failures
    on_success: Optional[Callable] = None      # Optional: Callback for successful sends
)

Methods

track(event: str, properties: Optional[Dict] = None, user_id: Optional[str] = None, group_id: Optional[str] = None) -> None

Track an event. Events can be attributed in two ways:

  • User events: Provide user_id to track user activity (most common)
  • Group events: Provide group_id without user_id for organization-level events
# User event (most common)
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Group event (for webhooks, cron jobs, system events)
client.track('Events Received', {
    'count': 100,
    'source': 'webhook'
}, group_id='org_456')

Note: The group_id parameter can also be combined with user_id for multi-tenant scenarios where you need to specify which organization context a user event occurred in.

identify(user_id: str, traits: Optional[Dict] = None) -> None

Identify a user with traits.

client.identify('user_123', {
    'email': 'user@example.com',
    'name': 'Stefan'
})

group(group_id: str, traits: Optional[Dict] = None, user_id: Optional[str] = None) -> None

Associate a user with a group and/or set group traits.

# Associate user with a group and set group traits (most common)
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Just link a user to a group (traits already set or not needed)
client.group('org_456', user_id='user_123')

# Just update group traits (e.g., from a webhook or background job)
client.group('org_456', {
    'plan': 'enterprise',
    'employee_count': 50
})

flush() -> None

Manually flush queued events immediately.

client.flush()

shutdown() -> None

Gracefully shutdown the client, flushing remaining events.

client.shutdown()

queue_size() -> int

Return the number of events currently in the queue.

pending = client.queue_size()
print(f"{pending} events waiting to be sent")

Synchronous Methods

For cases where you need confirmation that events were sent (e.g., before process exit, in tests), use the synchronous variants:

track_sync(event, properties, user_id, group_id) -> BatchResponse

Track an event synchronously. Raises SendError on failure.

from klime import SendError

try:
    response = client.track_sync('Critical Action', {'key': 'value'}, user_id='user_123')
    print(f"Sent! Accepted: {response.accepted}")
except SendError as e:
    print(f"Failed to send: {e.message}")

identify_sync(user_id, traits) -> BatchResponse

Identify a user synchronously. Raises SendError on failure.

group_sync(group_id, traits, user_id) -> BatchResponse

Associate a user with a group synchronously. Raises SendError on failure.

Features

  • Automatic Batching: Events are automatically batched and sent every 2 seconds or when the batch size reaches 20 events
  • Automatic Retries: Failed requests are automatically retried with exponential backoff
  • Thread-Safe: Safe to use from multiple threads
  • Process Exit Handling: Automatically flushes events on process exit (via atexit)
  • Zero Dependencies: Uses only Python standard library

Performance

When you call track(), identify(), or group(), the SDK:

  1. Adds the event to a thread-safe queue (microseconds)
  2. Returns immediately without waiting for network I/O

Events are sent to Klime's servers in background threads. This means:

  • No network blocking: HTTP requests happen asynchronously in background threads
  • No latency impact: Tracking calls add < 1ms to your request handling time
  • Automatic batching: Events are queued and sent in batches (default: every 2 seconds or 20 events)
# This returns immediately - no HTTP request is made here
client.track('Button Clicked', {'button': 'signup'}, user_id='user_123')

# Your code continues without waiting
return {'success': True}

The only blocking operation is flush(), which waits for all queued events to be sent. This is typically only called during graceful shutdown.

Configuration

Default Values

  • flush_interval: 2000ms
  • max_batch_size: 20 events
  • max_queue_size: 1000 events
  • retry_max_attempts: 5 attempts
  • retry_initial_delay: 1000ms
  • flush_on_shutdown: True

Logging

The SDK uses Python's built-in logging module. By default, it logs to logging.getLogger("klime").

import logging

# Enable debug logging
logging.getLogger("klime").setLevel(logging.DEBUG)

# Or provide a custom logger
client = KlimeClient(
    write_key='your-write-key',
    logger=logging.getLogger("myapp.klime")
)

For Django/Flask, the SDK automatically integrates with your app's logging configuration.

Callbacks

def handle_error(error, events):
    # Report to your error tracking service
    sentry_sdk.capture_exception(error)
    print(f"Failed to send {len(events)} events: {error}")

def handle_success(response):
    print(f"Sent {response.accepted} events")

client = KlimeClient(
    write_key='your-write-key',
    on_error=handle_error,
    on_success=handle_success
)

Error Handling

The SDK automatically handles:

  • Transient errors (429, 503, network failures): Retries with exponential backoff
  • Permanent errors (400, 401): Logs error and drops event
  • Rate limiting: Respects Retry-After header

For synchronous operations, use *_sync() methods which raise SendError on failure:

from klime import KlimeClient, SendError

try:
    response = client.track_sync('Event', user_id='user_123')
except SendError as e:
    print(f"Failed: {e.message}, events: {len(e.events)}")