Documentation Index
Fetch the complete documentation index at: https://mintlify.com/theOehrly/Fast-F1/llms.txt
Use this file to discover all available pages before exploring further.
FastF1 can process large amounts of F1 data. This guide covers strategies to optimize performance, reduce loading times, and manage memory efficiently.
Caching Strategies
Caching is the most important optimization for FastF1. It prevents re-downloading data from the API on every script run.
Enable Caching
Always enable caching at the start of your script:
import fastf1
# Enable caching with a persistent directory
fastf1.Cache.enable_cache('/path/to/cache/directory')
# Or use a relative path
fastf1.Cache.enable_cache('./fastf1_cache')
Cache Location Best Practices
import os
import fastf1
# Option 1: User's home directory
cache_dir = os.path.expanduser('~/.fastf1_cache')
fastf1.Cache.enable_cache(cache_dir)
# Option 2: Project-specific cache
project_cache = os.path.join(os.getcwd(), 'cache')
fastf1.Cache.enable_cache(project_cache)
# Option 3: Temporary directory (cleared on reboot)
import tempfile
temp_cache = os.path.join(tempfile.gettempdir(), 'fastf1_cache')
fastf1.Cache.enable_cache(temp_cache)
Cache Management
import fastf1
import os
import shutil
# Check cache size
def get_cache_size(cache_dir):
total_size = 0
for dirpath, dirnames, filenames in os.walk(cache_dir):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size / (1024 * 1024) # MB
cache_dir = './fastf1_cache'
fastf1.Cache.enable_cache(cache_dir)
print(f"Cache size: {get_cache_size(cache_dir):.2f} MB")
# Clear cache if needed
def clear_cache(cache_dir):
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
os.makedirs(cache_dir)
print("Cache cleared")
# clear_cache(cache_dir) # Uncomment to clear
Cache Duration
Cached data remains valid indefinitely for completed sessions. For ongoing or recent sessions, you may want to refresh:
import fastf1
from datetime import datetime, timedelta
fastf1.Cache.enable_cache('./cache')
# For very recent sessions, you might want to re-download
session = fastf1.get_session(2024, 'Bahrain', 'R')
# Check if session was recent (within last 48 hours)
session_date = session.event['EventDate']
if datetime.now() - session_date < timedelta(hours=48):
print("Recent session - consider clearing cache for latest data")
Efficient Data Loading
Load Only What You Need
Avoid loading unnecessary data:
import fastf1
fastf1.Cache.enable_cache('./cache')
session = fastf1.get_session(2023, 'Monza', 'R')
# Load with specific options
session.load(
telemetry=True, # Load telemetry data
weather=True, # Load weather data
messages=True # Load race control messages
)
# For lap time analysis only, telemetry isn't needed
session.load(
telemetry=False, # Skip telemetry loading
weather=False,
messages=False
)
Selective Lap Loading
Load and process only relevant laps:
# Instead of loading all laps and then filtering
session = fastf1.get_session(2023, 'Monaco', 'Q')
session.load()
# Filter immediately after loading
quick_laps = session.laps.pick_quicklaps()
# Or filter to specific drivers
driver_laps = session.laps.pick_drivers(['VER', 'HAM'])
Batch Processing Multiple Sessions
import fastf1
fastf1.Cache.enable_cache('./cache')
def analyze_session(year, event, session_type):
"""Analyze a single session efficiently"""
try:
session = fastf1.get_session(year, event, session_type)
session.load(telemetry=False) # Skip telemetry if not needed
# Process only what's needed
results = {
'fastest_lap': session.laps.pick_fastest()['LapTime'],
'winner': session.results.iloc[0]['Abbreviation']
}
return results
except Exception as e:
print(f"Error loading {year} {event} {session_type}: {e}")
return None
# Analyze multiple sessions
events = ['Bahrain', 'Saudi Arabia', 'Australia']
for event in events:
results = analyze_session(2023, event, 'R')
print(f"{event}: {results}")
Memory Management
Working with Large Datasets
import fastf1
import gc # Garbage collector
fastf1.Cache.enable_cache('./cache')
# Process sessions one at a time
events = ['Bahrain', 'Saudi Arabia', 'Australia', 'Azerbaijan']
results = []
for event in events:
session = fastf1.get_session(2023, event, 'R')
session.load(telemetry=False)
# Extract only what you need
fastest_lap = session.laps.pick_fastest()
results.append({
'event': event,
'fastest_time': fastest_lap['LapTime'],
'driver': fastest_lap['Driver']
})
# Clear session data from memory
del session
gc.collect() # Force garbage collection
# Now process results
import pandas as pd
df = pd.DataFrame(results)
print(df)
Telemetry Data Optimization
import fastf1
fastf1.Cache.enable_cache('./cache')
session = fastf1.get_session(2023, 'Spa', 'Q')
session.load()
# Get telemetry for specific lap
lap = session.laps.pick_drivers('VER').pick_fastest()
telemetry = lap.get_telemetry()
# Work with only required columns
speed_data = telemetry[['Distance', 'Speed']]
# Downsample if needed for visualization
downsampled = telemetry.iloc[::10] # Every 10th row
# Or resample to specific frequency
telemetry_5hz = telemetry.iloc[::4] # ~5Hz from ~20Hz data
Chunked Processing
import fastf1
import pandas as pd
fastf1.Cache.enable_cache('./cache')
def process_laps_in_chunks(session, chunk_size=100):
"""Process laps in chunks to manage memory"""
all_laps = session.laps
num_laps = len(all_laps)
results = []
for i in range(0, num_laps, chunk_size):
chunk = all_laps.iloc[i:i+chunk_size]
# Process chunk
chunk_results = chunk.groupby('Driver').agg({
'LapTime': 'mean'
})
results.append(chunk_results)
# Combine results
return pd.concat(results).groupby(level=0).mean()
session = fastf1.get_session(2023, 'Monza', 'R')
session.load(telemetry=False)
avg_times = process_laps_in_chunks(session)
print(avg_times)
Data Loading Tips
Pre-load Data
For repeated analysis, pre-load sessions:
import fastf1
fastf1.Cache.enable_cache('./cache')
# Pre-load all sessions for a weekend
year, event = 2023, 'Monaco'
print("Pre-loading weekend data...")
for session_type in ['FP1', 'FP2', 'FP3', 'Q', 'R']:
try:
session = fastf1.get_session(year, event, session_type)
session.load()
print(f"✓ {session_type} loaded and cached")
except Exception as e:
print(f"✗ {session_type} failed: {e}")
print("\nAll data cached. Subsequent loads will be instant.")
Parallel Loading
import fastf1
from concurrent.futures import ThreadPoolExecutor
fastf1.Cache.enable_cache('./cache')
def load_session(year, event, session_type):
"""Load a single session"""
try:
session = fastf1.get_session(year, event, session_type)
session.load(telemetry=False)
return f"{event} {session_type} loaded"
except Exception as e:
return f"{event} {session_type} failed: {e}"
# Load multiple sessions in parallel
events = ['Bahrain', 'Saudi Arabia', 'Australia']
sessions = ['Q', 'R']
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for event in events:
for session_type in sessions:
future = executor.submit(load_session, 2023, event, session_type)
futures.append(future)
for future in futures:
print(future.result())
Telemetry Frequency
Reduce telemetry frequency for faster processing:
import fastf1
from fastf1.core import Telemetry
fastf1.Cache.enable_cache('./cache')
# Set lower telemetry frequency globally
Telemetry.TELEMETRY_FREQUENCY = 5 # Hz (default is 'original' ~20Hz)
session = fastf1.get_session(2023, 'Spa', 'Q')
session.load()
lap = session.laps.pick_fastest()
telemetry = lap.get_telemetry() # Will be resampled to 5Hz
print(f"Telemetry points: {len(telemetry)}")
# Reset to original
Telemetry.TELEMETRY_FREQUENCY = 'original'
Logging and Debugging
Control Logging Verbosity
import fastf1
# Reduce logging output for cleaner console
fastf1.set_log_level('WARNING') # Only show warnings and errors
# Or increase for debugging
fastf1.set_log_level('DEBUG') # Show detailed information
# Available levels: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
import fastf1
import time
fastf1.Cache.enable_cache('./cache')
def timed_load(year, event, session_type):
"""Load session and measure time"""
start = time.time()
session = fastf1.get_session(year, event, session_type)
session.load()
elapsed = time.time() - start
print(f"{event} {session_type}: {elapsed:.2f}s")
return session
# First load (downloads data)
print("First load:")
session1 = timed_load(2023, 'Monaco', 'R')
# Second load (from cache)
print("\nSecond load (cached):")
session2 = timed_load(2023, 'Monaco', 'R')
Best Practices Summary
- Always enable caching - Reduces load times from minutes to seconds
- Use persistent cache directory - Don’t use temporary directories that get cleared
- Load selectively - Set
telemetry=False if you don’t need it
- Filter early - Use
pick_drivers(), pick_quicklaps() immediately after loading
- Process in chunks - For large datasets, process data in smaller batches
- Clear variables - Use
del and gc.collect() for long-running scripts
- Monitor memory - Use system monitor during development
- Reduce logging - Set log level to WARNING in production
- Downsample telemetry - Use lower frequency or skip points for visualization
- Parallel loading - Use ThreadPoolExecutor for loading multiple sessions
import fastf1
import gc
# ✓ Enable caching
fastf1.Cache.enable_cache('./cache')
# ✓ Reduce logging verbosity
fastf1.set_log_level('WARNING')
# ✓ Load only needed data
session = fastf1.get_session(2023, 'Monza', 'R')
session.load(telemetry=False, weather=False, messages=False)
# ✓ Filter immediately
laps = session.laps.pick_quicklaps()
# ✓ Work with filtered data
results = laps.groupby('Driver')['LapTime'].mean()
# ✓ Clean up when done
del session, laps
gc.collect()
print(results)
Slow Loading
- Ensure caching is enabled
- Check internet connection speed
- Verify cache directory has write permissions
- Try loading without telemetry first
High Memory Usage
- Process sessions one at a time
- Avoid loading full season at once
- Reduce telemetry frequency
- Use chunked processing
- Clear variables with
del and gc.collect()
Cache Not Working
- Verify cache directory exists and is writable
- Check if
enable_cache() is called before get_session()
- Ensure sufficient disk space
- Clear corrupted cache files if needed
Next Steps