This recipe demonstrates how to build an intelligent weather assistant that combines:
- LLM-powered intent detection and natural language understanding
- OpenAI-compatible function calling for structured data retrieval
- Efficient context management and response generation
- Modern React frontend with real-time updates
- Advanced performance optimizations and monitoring
While this recipe focuses on weather data, the patterns demonstrated here can be adapted for various reporting and automation needs, such as:
- Financial reporting and market analysis
- Product recommendations and catalog queries
- Medical record summaries and health reports
- Customer service automation
- Real estate property analysis
- And many other domain-specific applications
We'll create a solution that showcases:
- MAX Serve capabilities with Llama 3
- Multi-stage LLM pipeline for natural interaction
- Function calling for external API integration
- Real-time frontend with Next.js and TypeScript
- Production-pattern optimizations:
- Connection pooling for efficient API calls
- Semantic caching with embeddings
- Operation time tracking and monitoring
- Comprehensive error handling and resilience
You'll learn how to:
- Build a multi-stage LLM pipeline for intent classification and response generation
- Implement efficient data fetching with connection pooling and concurrent requests
- Use semantic caching to improve response times and reduce API calls
- Monitor and optimize performance with detailed operation timing
- Handle errors gracefully with automatic retries and fallbacks
Please make sure your system meets our system requirements.
To proceed, ensure you have the magic
CLI installed with the magic --version
to be 0.7.2 or newer:
curl -ssL https://magic.modular.com/ | bash
or update it via:
magic self-update
Then install max-pipelines
via:
magic global install -u max-pipelines
For this recipe, you will also need:
- A valid Hugging Face token for accessing Llama 3
- A free API key from WeatherAPI for weather data
Set up your environment variables:
cp backend/.env.sample backend/.env
echo "HUGGING_FACE_HUB_TOKEN=your_hf_token" > backend/.env
echo "WEATHERAPI_API_KEY=your_api_key" >> backend/.env
-
Download the code for this recipe using
magic
CLI:magic init ai-weather-agent --from modular/max-recipes/ai-weather-agent cd ai-weather-agent
-
Run the application:
Make sure the ports
7999, 8001
and8010
are available. You can adjust the port settings in Procfile.magic run app
Note that it may take a few minutes for models to be downloaded and compiled.
-
Open http://localhost:3000 in your browser to see the UI when all services below are ready:
- MAX Serve embedding on port
7999
- MAX Serve Llama 3 on port
8000
and - Backend FastAPI on port
8001
Then you can ask weather-related questions and get detailed reports.
Actions to take:
- Go ahead and send a chat request with
Hi
. - Then proceed with a city name like
Vancouver
. Notice that the app understands it's a city name and provides a weather report. - Check the observability for each involved step in the UI.
- Try entering "yvr" and notice that the report is generated much faster this time. Keep reading to understand the optimizations such as semantic-cache.
- Try with more queries and check the results
- MAX Serve embedding on port
-
And once done with the app, to clean up the resources run:
magic run clean
The weather assistant uses a multi-tier architecture:
The architecture consists of several key components:
- Frontend (Next.js): A modern React application providing real-time chat interface and weather visualization
- Backend (FastAPI): Orchestrates the entire flow, handling request routing and response generation
- MAX Serve: Runs the Llama 3 model for intent detection, function calling, and response generation
- WeatherAPI: External service providing current weather conditions and forecasts
- Sentence Transformers: Used
sentence-transformers/all-mpnet-base-v2
for generating embeddings for semantic caching - Semantic Cache: Stores recent query results to improve response times
Each component is designed to be independently scalable and maintainable. The backend uses FastAPI's async capabilities to handle concurrent requests efficiently, while MAX Serve provides high-performance inference for the LLM components.
Here's how a typical weather query flows through the system:
This sequence represents a complete query lifecycle:
- Initial request: User enters a weather-related question in the chat interface
- Cache check: Backend first checks if a similar query exists in the semantic cache
- Intent processing:
- LLM determines if the query is weather-related
- If weather-related, function calling extracts the city name
- City normalization:
- LLM standardizes city names (e.g., "NYC" → "New York City")
- Handles variations and abbreviations consistently
- Data gathering:
- Weather data is fetched from WeatherAPI
- Multiple data sources are queried concurrently
- Response generation:
- LLM generates a natural language response from the data
- Response is cached for future similar queries
- Display: Frontend renders both the text response and weather visualization
The entire process typically completes in 2-3 seconds, with cached responses returning in under 500ms.
The backend implements a sophisticated multi-stage LLM pipeline that processes user queries through several key stages. Weather and space weather data are fetched concurrently for optimal performance:
async def fetch_all_weather_data(city: str) -> dict:
"""Fetch both weather and space weather data concurrently"""
weather_task = fetch_weather_data(city)
space_weather_task = fetch_space_weather()
results = await asyncio.gather(
weather_task,
space_weather_task
)
return {
"weather": results[0],
"space_weather": results[1]
}
The first stage determines whether the user is asking about weather or making general conversation:
# Example queries and their classifications:
# "What's the weather like in London?" -> "WEATHER_QUERY"
# "How are you doing today?" -> "GENERAL_CHAT"
# "Will it rain in Paris tomorrow?" -> "WEATHER_QUERY"
# "Tell me a joke" -> "GENERAL_CHAT"
@track_operation_time("intent_detection")
async def detect_intent(request_message: str, timing_collector: TimingCollector):
"""Detect if the user is asking about weather or just chatting"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[
{"role": "system", "content": INTENT_PROMPT},
{"role": "user", "content": request_message},
],
temperature=0, # Use 0 for deterministic outputs
)
return response.choices[0].message.content.strip()
The intent classifier uses temperature=0 for consistent outputs and is wrapped with timing tracking for performance monitoring.
After detecting weather-related intent, the backend normalizes city names using the LLM to handle variations and abbreviations:
CITY_NORMALIZATION_PROMPT = """Normalize the following city name to its standard form.
Examples:
- "NYC" -> "New York City"
- "SF" -> "San Francisco"
- "LA" -> "Los Angeles"
City: {city}
Respond with only the normalized city name, nothing else."""
async def normalize_city_name(city: str, timing_collector: TimingCollector) -> str:
"""Standardize city names for consistent API calls and caching"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[
{"role": "system", "content": CITY_NORMALIZATION_PROMPT.format(city=city)},
],
max_tokens=50,
temperature=0, # Use 0 for consistent outputs
)
return response.choices[0].message.content.strip()
This normalization ensures:
- Consistent API calls for different variations of city names
- Better cache utilization by standardizing queries
- Improved user experience with flexible input handling
When a weather query is detected, the backend uses OpenAI-compatible function calling to structure the request and fetch relevant data:
TOOLS = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather and forecast data for a city",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "The city name to get weather for",
}
},
"required": ["city"],
},
},
},
{
"type": "function",
"function": {
"name": "get_air_quality",
"description": "Get air quality data for a city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string"},
"include_forecast": {"type": "boolean", "default": false}
},
"required": ["city"],
},
},
}
]
async def handle_function_calling(message: str) -> dict:
"""Extract structured data requirements from natural language"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[{"role": "user", "content": message}],
tools=TOOLS,
tool_choice="auto",
)
# Process tool calls and fetch data
results = {}
for tool_call in response.choices[0].message.tool_calls:
func_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
if func_name == "get_weather":
results["weather"] = await fetch_weather_data(arguments["city"])
elif func_name == "get_air_quality":
results["air_quality"] = await fetch_air_quality(arguments["city"])
return results
The final stage generates a natural language report from the collected weather data:
WEATHER_ANALYSIS_PROMPT = """Given the user request about weather:
User: {user}
Analyze the following weather data and provide a natural, conversational summary:
Weather data: {weather_data}
Focus on:
1. Current conditions and how they feel
2. Notable patterns or changes in the forecast
3. Any relevant warnings or recommendations
"""
async def analyze_weather_data(request_message: str, weather_data: dict):
"""Generate a natural language report from weather data"""
content = WEATHER_ANALYSIS_PROMPT.format(
user=request_message,
weather_data=str(weather_data)
)
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[{"role": "system", "content": content}],
max_tokens=512,
temperature=0.7, # Slightly higher for more natural language
)
return response.choices[0].message.content
The backend implements several advanced techniques to optimize performance and monitor system behavior:
The backend implements efficient HTTP connection pooling for external API calls:
@asynccontextmanager
async def get_http_client():
"""Connection pooling: Shared http client for better connection reuse."""
async with httpx.AsyncClient(
timeout=httpx.Timeout(30.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=5),
) as client:
yield client
This optimization provides:
- Connection reuse to reduce TCP handshake overhead
- Configurable connection pool size (5 keepalive connections)
- Separate timeouts for connection (10s) and operations (30s)
- Automatic connection lifecycle management through context manager
The backend uses asyncio locks and precise timing for operation tracking:
class TimingCollector:
def __init__(self, lock: asyncio.Lock = None):
self._timings: List[Dict[str, Union[str, float]]] = []
self._lock = lock or asyncio.Lock()
async def add_timing(self, operation: str, duration_ms: float):
async with self._lock:
self._timings.append({
"operation": operation,
"duration_ms": duration_ms
})
async def get_timings(self) -> List[Dict[str, Union[str, float]]]:
async with self._lock:
return self._timings.copy()
This system helps:
- Safely track concurrent operations with asyncio locks
- Provide accurate timing for each pipeline stage
- Enable real-time performance monitoring in the UI
To reduce API calls and improve response times, the backend implements semantic caching using embeddings:
class SemanticCache:
def __init__(self, threshold=0.75, ttl_seconds=CACHE_TTL):
self.threshold = threshold
self.ttl_seconds = ttl_seconds
self.cache: Dict[Tuple[float, ...], Tuple[Any, datetime]] = {}
self._lock = Lock()
async def _compute_embedding(self, text: str) -> np.ndarray:
"""Get embeddings from MAX Serve embedding endpoint"""
response = await embedding_client.embeddings.create(
model=EMBEDDING_MODEL,
input=text
)
embedding = np.array(response.data[0].embedding)
return embedding
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity between two vectors"""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
async def get(self, text: str, normalized_city: str = None) -> Tuple[bool, Any]:
"""Try to find a semantically similar cached result"""
async with self._lock:
now = datetime.now()
# Clean expired entries
expired = []
for embedding_tuple, (value, timestamp) in self.cache.items():
if (now - timestamp).total_seconds() > self.ttl_seconds:
expired.append(embedding_tuple)
for emb in expired:
del self.cache[emb]
# Use normalized city if provided, otherwise use original text
query_text = normalized_city if normalized_city else text
query_embedding = await self._compute_embedding(query_text)
query_tuple = tuple(query_embedding.tolist())
# Find most similar cached query
max_similarity = 0
best_match = None
for cached_embedding_tuple, (value, _) in self.cache.items():
cached_embedding = np.array(cached_embedding_tuple)
similarity = self._cosine_similarity(query_embedding, cached_embedding)
if similarity > max_similarity:
max_similarity = similarity
best_match = value
if max_similarity > self.threshold:
return True, best_match
return False, None
# Example cache decorator usage
@semantic_cache(threshold=0.90, ttl_seconds=CACHE_TTL)
@track_operation_time("chat_response")
async def generate_chat_response(request_message: str, timing_collector: TimingCollector):
"""Generate a general chat response with caching"""
response = await llm_client.chat.completions.create(
model="modularai/Llama-3.1-8B-Instruct-GGUF",
messages=[
{
"role": "system",
"content": "You are a friendly weather assistant. Provide helpful and concise responses.",
},
{"role": "user", "content": request_message},
],
max_tokens=256,
temperature=0,
)
return response.choices[0].message.content
The semantic cache provides several key features:
- Thread-safe operations with asyncio locks
- Automatic cleanup of expired entries
- Configurable similarity threshold
- Support for normalized city names
- Integration with timing collection
Example similar queries that would hit the cache:
- "What's the weather in NYC?" ↔ "Tell me the weather in New York City"
- "Is it raining in London?" ↔ "What's the precipitation in London?"
- "Temperature in Paris" ↔ "How hot is it in Paris?"
The backend implements comprehensive error handling:
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": str(exc.detail)}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
logger.exception("Unexpected error")
return JSONResponse(
status_code=500,
content={"detail": "An unexpected error occurred"}
)
def wait_for_llm_server(base_url: str):
"""Ensure LLM server is healthy before starting"""
@retry(
stop=stop_after_attempt(20),
wait=wait_fixed(60),
retry=(
retry_if_exception_type(httpx.RequestError) |
retry_if_result(lambda x: x.status_code != 200)
)
)
async def _check_health():
async with httpx.AsyncClient() as client:
return await client.get(f"{base_url}/health")
return _check_health()
The system includes features such as:
- Graceful degradation when services are unavailable
- Error logging and monitoring
The frontend is built with Next.js 14 and TypeScript, featuring three main components:
interface ChatMessage {
role: 'user' | 'assistant';
content: string;
data?: WeatherData;
timings?: Array<{
operation: string;
duration_ms: number;
}>;
}
export default function Chat() {
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [isLoading, setIsLoading] = useState(false);
}
interface WeatherData {
weather: {
location: {
name: string;
country: string;
localtime: string;
};
current: {
temperature: number;
condition: string;
feels_like: number;
humidity: number;
wind_kph: number;
};
forecast: Array<{
date: string;
max_temp: number;
min_temp: number;
condition: string;
}>;
};
air_quality?: {
aqi: number;
pm2_5: number;
};
}
interface OperationTiming {
operation: string;
duration_ms: number;
}
const operationLabels: Record<string, string> = {
'intent_detection': 'Analyzing question',
'weather_data_fetch': 'Getting weather data',
'weather_analysis': 'Creating weather report'
};
Common issues and solutions:
-
LLM server connection issues
- Ensure MAX Serve is running (
magic run app
) - Check logs for any GPU-related errors
- Verify Hugging Face token is set correctly
- Ensure MAX Serve is running (
-
Weather API Problems
- Verify API key in
.env
- Check rate limits on free tier
- Ensure city names are valid
- Verify API key in
-
Frontend Development
- Clear
.next
cache if builds fail - Run
npm install
after pulling updates - Check browser console for errors
- Clear
-
Performance Issues
- Monitor GPU memory usage
- Adjust batch sizes if needed
- Check semantic cache hit rates
This recipe demonstrates patterns that can be applied to many domains beyond weather reporting:
- Replace WeatherAPI with your domain's data source (e.g., financial data, product catalogs, medical records)
- Modify function definitions to match your data schema
- Adjust prompts for domain-specific report generation
The following components can be directly reused:
- Intent classification system for routing queries
- Semantic caching for similar questions in your domain
- Connection pooling for any external API calls
- Performance monitoring and error handling infrastructure
-
Financial reports
- Replace weather data with market data
- Adapt function calling for stock symbols
- Generate natural language market analysis
-
Product recommendations
- Use product catalog as data source
- Modify functions for product queries
- Generate personalized recommendations
-
Medical summaries
- Connect to health record systems
- Add functions for patient data retrieval
- Generate patient friendly health reports
The multi-stage LLM pipeline pattern, combined with efficient data handling and caching, provides a robust foundation for building various domain-specific AI assistants.
This recipe demonstrates how to build a production-ready AI assistant that combines:
- Efficient multi-stage LLM pipelines for natural language understanding and generation
- Advanced performance optimizations like semantic caching and connection pooling
- Robust error handling and monitoring capabilities
- Modern frontend with real-time updates and performance tracking
The patterns and components shown here provide a solid foundation for building your own domain-specific AI assistants. Whether you're working with financial data, medical records, or product catalogs, the architecture can be adapted while maintaining performance, reliability, and user experience.
Now that you've built a foundation for AI-powered applications, you can explore more advanced deployments and features:
- Explore MAX documentation for more features
- Deploy MAX Serve on AWS, GCP or Azure
- Join our Modular Forum and Discord community
We're excited to see what you'll build with MAX! Share your projects with us using #ModularAI
on social media.