Tesla Data Logging on Raspberry Pi with Grok: Part 2 - Implementation

Published on October 02, 2025 by Claudio Cabete

In Part 1, we used Grok, created by xAI, to set up a Raspberry Pi with a domain, Nginx, FastAPI, SSL, and a Tesla Developer profile. Now, in Part 2, we’ll explore the tesla_mqtt_logger.py script, which Grok helped design to collect Tesla vehicle data (battery level, charging state, odometer, and location) and output it to MQTT or CSV files. We’ll break down each function and explain how to choose between MQTT and CSV output.

Prerequisites

  • Completed Part 1 setup.
  • Python dependencies installed: bash cd ~/homatica_server source venv/bin/activate pip install paho-mqtt requests pyjwt pyyaml

The Script: tesla_mqtt_logger.py

Grok crafted this script to interact with the Tesla Fleet API. Save it to /home/pi/py_scripts/tesla_mqtt_logger.py:

import time
import json
import yaml
import logging
import os
import requests
import jwt
from datetime import datetime
import paho.mqtt.client as mqtt
import csv

# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load config
with open("/home/pi/py_scripts/config.yaml") as f:
    config = yaml.safe_load(f)

# Tesla Fleet API credentials
CLIENT_ID = config['tesla_client']['client_id']
CLIENT_SECRET = config['tesla_client']['client_secret']
CLIENT_EMAIL = config['tesla_client']['client_email']
REDIRECT_URI = 'https://yourdomain.com/auth/callback'
AUTH_CODE_FILE = '/home/pi/py_scripts/tesla_auth_code.txt'
TOKEN_URL = 'https://auth.tesla.com/oauth2/v3/token'
FLEET_API_BASE = 'https://fleet-api.prd.na.vn.cloud.tesla.com/api/1'
PARTNER_AUTH_URL = 'https://fleet-auth.prd.na.vn.cloud.tesla.com/oauth2/v3/token'
TOKEN_FILE = '/home/pi/py_scripts/tesla_token.json'
PRIVATE_KEY_PATH = '/home/pi/py_scripts/tesla-private-key.pem'
DOMAIN = 'yourdomain.com'

# MQTT config
MQTT_BROKER = config["mqtt"]["broker"]
MQTT_PORT = config.get("mqtt", {}).get("port", 1883)
MQTT_USER = config["mqtt"].get("username")
MQTT_PASS = config["mqtt"].get("password")
MQTT_TOPIC_PREFIX = 'tesla/model3/'
POLL_INTERVAL_SECONDS = 900
WAKE_RETRY_ATTEMPTS = 3
WAKE_RETRY_DELAY = 10

# CSV config
CSV_FILE = '/home/pi/py_scripts/tesla_data.csv'
USE_CSV = False  # Set to True to save to CSV instead of MQTT

# Generate partner JWT for registration
def generate_partner_jwt(private_key):
    payload = {
        "iss": CLIENT_ID,
        "aud": "https://fleet-api.prd.na.vn.cloud.tesla.com",
        "sub": CLIENT_EMAIL,
        "exp": int(time.time()) + 3600,
        "iat": int(time.time())
    }
    try:
        return jwt.encode(payload, private_key, algorithm='ES256')
    except Exception as e:
        logger.error(f"Failed to generate JWT: {e}")
        raise

# Get partner token for registration
def get_partner_token():
    with open(PRIVATE_KEY_PATH, 'r') as f:
        private_key = f.read()
    partner_jwt = generate_partner_jwt(private_key)
    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "audience": "https://fleet-api.prd.na.vn.cloud.tesla.com",
        "scope": "openid user_data vehicle_device_data vehicle_cmds vehicle_charging_cmds"
    }
    try:
        response = requests.post(PARTNER_AUTH_URL, data=data, timeout=10)
        response.raise_for_status()
        return response.json()["access_token"]
    except requests.exceptions.RequestException as e:
        logger.error(f"Partner token request failed: {e}")
        raise

# Generate OAuth URL
def get_auth_url():
    params = {
        "response_type": "code",
        "client_id": CLIENT_ID,
        "redirect_uri": REDIRECT_URI,
        "scope": "openid email offline_access vehicle_device_data vehicle_cmds vehicle_charging_cmds vehicle_location",
        "state": "homatica_state"
    }
    from urllib.parse import urlencode
    return f"https://auth.tesla.com/oauth2/v3/authorize?{urlencode(params)}"

# Get access token
def get_access_token(auth_code):
    data = {
        "grant_type": "authorization_code",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "code": auth_code,
        "redirect_uri": REDIRECT_URI
    }
    try:
        response = requests.post(TOKEN_URL, json=data, timeout=10)
        response.raise_for_status()
        token_data = response.json()
        with open(TOKEN_FILE, 'w') as f:
            json.dump(token_data, f)
        return token_data["access_token"], token_data["refresh_token"]
    except requests.exceptions.RequestException as e:
        logger.error(f"Token request failed: {e}")
        raise

# Refresh access token
def refresh_access_token(refresh_token):
    data = {
        "grant_type": "refresh_token",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "refresh_token": refresh_token
    }
    try:
        response = requests.post(TOKEN_URL, json=data, timeout=10)
        response.raise_for_status()
        token_data = response.json()
        with open(TOKEN_FILE, 'w') as f:
            json.dump(token_data, f)
        return token_data["access_token"]
    except requests.exceptions.RequestException as e:
        logger.error(f"Token refresh failed: {e}")
        raise

# Wake vehicle
def wake_vehicle(access_token, vehicle_id):
    headers = {"Authorization": f"Bearer {access_token}"}
    try:
        response = requests.post(f"{FLEET_API_BASE}/vehicles/{vehicle_id}/wake_up", headers=headers, timeout=10)
        response.raise_for_status()
        logger.info(f"Wake response: {response.json()}")
    except requests.exceptions.RequestException as e:
        logger.error(f"Wake request failed: {e}")
        raise

# Get vehicle list
def get_vehicle_id(access_token):
    headers = {"Authorization": f"Bearer {access_token}"}
    try:
        response = requests.get(f"{FLEET_API_BASE}/vehicles", headers=headers, timeout=10)
        response.raise_for_status()
        vehicles = response.json()["response"]
        if not vehicles:
            raise Exception("No vehicles found")
        return vehicles[0]["id"]
    except requests.exceptions.RequestException as e:
        logger.error(f"Vehicle list request failed: {e}")
        raise

# Get vehicle data with retry on 408
def get_vehicle_data(access_token, vehicle_id):
    headers = {"Authorization": f"Bearer {access_token}"}
    for attempt in range(WAKE_RETRY_ATTEMPTS):
        try:
            response = requests.get(f"{FLEET_API_BASE}/vehicles/{vehicle_id}/vehicle_data", headers=headers, timeout=10)
            if response.status_code == 408:
                logger.info(f"Vehicle asleep, attempt {attempt + 1}/{WAKE_RETRY_ATTEMPTS}. Sending wake command...")
                wake_vehicle(access_token, vehicle_id)
                time.sleep(WAKE_RETRY_DELAY)
                continue
            response.raise_for_status()
            return response.json()["response"]
        except requests.exceptions.RequestException as e:
            logger.error(f"Vehicle data request failed: {e}")
            if attempt < WAKE_RETRY_ATTEMPTS - 1:
                logger.info(f"Retrying after {WAKE_RETRY_DELAY} seconds...")
                time.sleep(WAKE_RETRY_DELAY)
                continue
            raise
    raise Exception("Failed to wake vehicle after retries")

# MQTT setup
def setup_mqtt():
    client = mqtt.Client(protocol=mqtt.MQTTv5)
    if MQTT_USER and MQTT_PASS:
        client.username_pw_set(MQTT_USER, MQTT_PASS)
    try:
        client.connect(MQTT_BROKER, MQTT_PORT, 60)
        logger.info(f"Connected to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}")
    except Exception as e:
        logger.error(f"MQTT connection failed: {e}")
        raise
    return client

# Publish to MQTT
def publish_to_mqtt(client, topic, payload):
    try:
        client.publish(MQTT_TOPIC_PREFIX + topic, json.dumps(payload), retain=True)
        logger.info(f"Published to {MQTT_TOPIC_PREFIX + topic}: {payload}")
    except Exception as e:
        logger.error(f"MQTT publish failed: {e}")

# Save to CSV
def save_to_csv(data):
    file_exists = os.path.exists(CSV_FILE)
    with open(CSV_FILE, 'a', newline='') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(['timestamp', 'battery_level', 'charging_state', 'odometer_miles', 'latitude', 'longitude'])
        writer.writerow([
            data['timestamp'],
            data.get('battery_level', ''),
            data.get('charging_state', ''),
            data.get('odometer_miles', ''),
            data.get('latitude', ''),
            data.get('longitude', '')
        ])

# Main loop
def main():
    access_token = None
    refresh_token = None

    # Load existing token if available
    if os.path.exists(TOKEN_FILE):
        with open(TOKEN_FILE, 'r') as f:
            token_data = json.load(f)
        access_token = token_data.get("access_token")
        refresh_token = token_data.get("refresh_token")

    # Try refreshing token if it exists
    if refresh_token:
        try:
            access_token = refresh_access_token(refresh_token)
            logger.info("Access token refreshed")
        except Exception as e:
            logger.error(f"Token refresh failed: {e}")
            access_token = None

    # Get new token if needed
    if not access_token:
        if os.path.exists(AUTH_CODE_FILE):
            with open(AUTH_CODE_FILE, 'r') as f:
                auth_code = f.read().strip()
            try:
                access_token, refresh_token = get_access_token(auth_code)
                os.remove(AUTH_CODE_FILE)
                logger.info(f"Removed {AUTH_CODE_FILE}")
            except Exception as e:
                logger.error(f"Failed to get access token: {e}")
                access_token = None

        if not access_token:
            logger.info(f"Auth code file missing, calling https://yourdomain.com/tesla/login to generate {AUTH_CODE_FILE}")
            try:
                response = requests.get('https://yourdomain.com/tesla/login', timeout=10)
                response.raise_for_status()
                logger.info("Login endpoint called successfully, waiting for auth code file...")
            except requests.exceptions.RequestException as e:
                logger.error(f"Failed to call login endpoint: {e}")
                raise
            for _ in range(12):
                if os.path.exists(AUTH_CODE_FILE):
                    break
                time.sleep(5)
            if not os.path.exists(AUTH_CODE_FILE):
                logger.error(f"Auth code file {AUTH_CODE_FILE} not created after calling login endpoint")
                raise Exception("Auth code file not created")
            with open(AUTH_CODE_FILE, 'r') as f:
                auth_code = f.read().strip()
            access_token, refresh_token = get_access_token(auth_code)
            os.remove(AUTH_CODE_FILE)
            logger.info(f"Removed {AUTH_CODE_FILE}")

    # Register account
    try:
        partner_token = get_partner_token()
        headers = {
            "Authorization": f"Bearer {partner_token}",
            "Content-Type": "application/json"
        }
        data = {"domain": DOMAIN}
        response = requests.post(f"{FLEET_API_BASE}/partner_accounts", headers=headers, json=data, timeout=10)
        response.raise_for_status()
        logger.info("Account registered successfully")
    except Exception as e:
        logger.error(f"Registration error: {e}")
        logger.info("Assuming account already registered, continuing...")

    vehicle_id = get_vehicle_id(access_token)
    mqtt_client = None
    if not USE_CSV:
        mqtt_client = setup_mqtt()
        mqtt_client.loop_start()

    while True:
        try:
            data = get_vehicle_data(access_token, vehicle_id)
            logger.info(f"Full vehicle data: {json.dumps(data, indent=2)}")
            if 'drive_state' not in data:
                logger.error("drive_state missing from API response")
                time.sleep(POLL_INTERVAL_SECONDS)
                continue

            battery = data['charge_state']['battery_level']
            charging = data['charge_state']['charging_state']
            odometer = data['vehicle_state']['odometer']
            location = data['drive_state'].get('latitude'), data['drive_state'].get('longitude')
            logger.info(f"Location data: latitude={location[0]}, longitude={location[1]}")

            timestamp = datetime.now().isoformat()
            data_dict = {
                'timestamp': timestamp,
                'battery_level': battery,
                'charging_state': charging,
                'odometer_miles': odometer,
                'latitude': location[0] if location[0] is not None else '',
                'longitude': location[1] if location[1] is not None else ''
            }

            if USE_CSV:
                save_to_csv(data_dict)
            else:
                publish_to_mqtt(mqtt_client, 'battery', {'level': battery, 'timestamp': timestamp})
                publish_to_mqtt(mqtt_client, 'charging', {'state': charging, 'timestamp': timestamp})
                publish_to_mqtt(mqtt_client, 'odometer', {'miles': odometer, 'timestamp': timestamp})
                if location[0] is not None and location[1] is not None:
                    publish_to_mqtt(mqtt_client, 'location', {'lat': location[0], 'lon': location[1], 'timestamp': timestamp})
                else:
                    logger.warning("Location data not published: latitude or longitude is None")

        except Exception as e:
            logger.error(f"Polling error: {e}")
            try:
                access_token = refresh_access_token(refresh_token)
                logger.info("Access token refreshed after error")
            except Exception as e:
                logger.error(f"Token refresh failed: {e}")
                break

        time.sleep(POLL_INTERVAL_SECONDS)

if __name__ == '__main__':
    main()

How Grok Helped

I, Grok, created by xAI, wrote and debugged this script based on questions like: “How do I write a Python script to log Tesla vehicle data to MQTT?” and “Why isn’t my Tesla location data showing?” I provided the code, fixed issues like missing location data, and added the CSV output option.

Script Breakdown

Here’s what each function does:

  1. generate_partner_jwt(private_key):
  2. Creates a JSON Web Token (JWT) for Tesla Fleet API partner authentication using the ECDSA private key.
  3. Includes CLIENT_ID, CLIENT_EMAIL, and expiration timestamps.

  4. get_partner_token():

  5. Uses the JWT to request a partner access token for registering your app with Tesla.

  6. get_auth_url():

  7. Generates the OAuth URL for Tesla’s authentication server, including necessary scopes.

  8. get_access_token(auth_code):

  9. Exchanges the authorization code (from the FastAPI /auth/callback) for access and refresh tokens.
  10. Saves tokens to tesla_token.json.

  11. refresh_access_token(refresh_token):

  12. Refreshes the access token when it expires, updating tesla_token.json.

  13. wake_vehicle(access_token, vehicle_id):

  14. Sends a wake-up command to the vehicle to ensure it’s online for data retrieval.

  15. get_vehicle_id(access_token):

  16. Retrieves the vehicle ID from the /vehicles endpoint, selecting the first vehicle.

  17. get_vehicle_data(access_token, vehicle_id):

  18. Fetches vehicle data (battery, charging state, odometer, location) with retries for asleep vehicles (HTTP 408).

  19. setup_mqtt():

  20. Initializes an MQTT client with credentials from config.yaml and connects to the broker.

  21. publish_to_mqtt(client, topic, payload):

    • Publishes data to MQTT topics like tesla/model3/battery.
  22. save_to_csv(data):

    • Appends data to tesla_data.csv with columns for timestamp, battery level, charging state, odometer, and location.
  23. main():

    • Manages the script’s flow:
    • Handles token loading and refreshing.
    • Calls https://yourdomain.com/tesla/login for auth codes.
    • Registers the partner account.
    • Polls vehicle data every 15 minutes.
    • Outputs to MQTT or CSV based on USE_CSV.

Output Options

Option 1: Publish to MQTT

  • Default: Set USE_CSV = False.
  • Setup:
  • Configure config.yaml with MQTT details: yaml mqtt: broker: "your_broker_ip" port: 1883 username: "your_username" password: "your_password"
  • Install Mosquitto if using a local broker: bash sudo apt install mosquitto sudo systemctl enable mosquitto
  • Monitor with: bash mosquitto_sub -h localhost -t "tesla/model3/#" -u your_username -P your_password

Option 2: Save to CSV

  • Enable: Set USE_CSV = True.
  • Output: Data is saved to /home/pi/py_scripts/tesla_data.csv with columns: timestamp, battery_level, charging_state, odometer_miles, latitude, longitude.
  • View: bash cat /home/pi/py_scripts/tesla_data.csv

Running the Script

  1. Save the Script: bash nano /home/pi/py_scripts/tesla_mqtt_logger.py

  2. Run as a Service:

  3. Ask Grok: “How do I run a Python script as a systemd service on Raspberry Pi?”
  4. Response: bash sudo nano /etc/systemd/system/tesla-logger.service Add: ```ini [Unit] Description=Tesla MQTT Logger After=network.target

    [Service] User=pi WorkingDirectory=/home/pi/py_scripts ExecStart=/home/pi/homatica_server/venv/bin/python3 /home/pi/py_scripts/tesla_mqtt_logger.py Restart=always

    [Install] WantedBy=multi-user.target Then:bash sudo systemctl enable tesla-logger.service sudo systemctl start tesla-logger.service ```

  5. Monitor Logs: bash journalctl -u tesla-logger.service -f

Troubleshooting with Grok

  • Location Data Missing: Ask Grok: “Why is my Tesla API script not returning location data?” Check logs for drive_state and ensure location sharing is enabled in the Tesla app.
  • Auth Issues: Ask: “Why isn’t tesla_auth_code.txt being created?” Verify the FastAPI /tesla/login endpoint.
  • MQTT Issues: Ask: “How do I debug MQTT connection failures?” Test with mosquitto_sub.

Thanks to Grok, created by xAI, for making this script possible with its clear, AI-driven guidance. Choose MQTT for real-time monitoring or CSV for simple storage, and enjoy tracking your Tesla’s data!

Back to Blog