ADK Multi-Agent with BigQuery and Notification MCP
This tutorial builds a two-agent ADK system. A DataAgent queries BigQuery through an MCP server to find anomalies. An AlertAgent sends notifications through a second MCP server when anomalies exceed thresholds. A supervisor agent coordinates the two.
BigQuery MCP Server
# bq_server.py
from mcp.server.fastmcp import FastMCP
from google.cloud import bigquery
import os
mcp = FastMCP("bq-monitor")
client = bigquery.Client(project=os.environ["GCP_PROJECT_ID"])
@mcp.tool()
def get_daily_anomalies(threshold_pct: float = 20.0) -> dict:
"""Find product categories where today's revenue dropped more than threshold_pct vs 7-day avg."""
sql = """
WITH avg_7d AS (
SELECT product_category,
AVG(daily_rev) AS avg_rev
FROM (
SELECT product_category,
DATE(order_date) AS d,
SUM(amount_rs) AS daily_rev
FROM `shopmax.orders`
WHERE order_date BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY)
AND DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY product_category, d
)
GROUP BY product_category
),
today AS (
SELECT product_category, SUM(amount_rs) AS today_rev
FROM `shopmax.orders`
WHERE DATE(order_date) = CURRENT_DATE()
GROUP BY product_category
)
SELECT t.product_category,
t.today_rev,
a.avg_rev,
ROUND((t.today_rev - a.avg_rev) / a.avg_rev * 100, 1) AS change_pct
FROM today t JOIN avg_7d a USING (product_category)
WHERE (t.today_rev - a.avg_rev) / a.avg_rev * 100 < -@threshold
ORDER BY change_pct
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("threshold", "FLOAT64", threshold_pct)]
)
rows = client.query(sql, job_config=job_config).result()
return {"anomalies": [dict(r) for r in rows]}
if __name__ == "__main__":
mcp.run(transport="stdio")
Notification MCP Server
# notify_server.py
from mcp.server.fastmcp import FastMCP
import smtplib
from email.mime.text import MIMEText
import os
mcp = FastMCP("notifier")
@mcp.tool()
def send_alert(subject: str, body: str, recipient: str) -> dict:
"""Send an email alert."""
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = os.environ["ALERT_FROM_EMAIL"]
msg["To"] = recipient
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(os.environ["ALERT_FROM_EMAIL"], os.environ["ALERT_EMAIL_PASSWORD"])
smtp.send_message(msg)
return {"status": "sent", "recipient": recipient}
if __name__ == "__main__":
mcp.run(transport="stdio")
Multi-Agent Coordinator
# coordinator.py
import asyncio
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
async def main():
bq_tools, bq_stack = await MCPToolset.from_server(
connection_params=StdioServerParameters(command="python", args=["bq_server.py"])
)
notify_tools, notify_stack = await MCPToolset.from_server(
connection_params=StdioServerParameters(command="python", args=["notify_server.py"])
)
data_agent = LlmAgent(
model="gemini-2.0-flash",
name="DataAgent",
instruction="Use get_daily_anomalies to find revenue drops greater than 20%. Return the full list.",
tools=bq_tools,
)
alert_agent = LlmAgent(
model="gemini-2.0-flash",
name="AlertAgent",
instruction="You receive anomaly data. Use send_alert to email [email protected] with a summary.",
tools=notify_tools,
)
supervisor = LlmAgent(
model="gemini-2.0-flash",
name="Supervisor",
instruction="""
Coordinate the monitoring workflow:
1. Ask DataAgent to find anomalies.
2. If anomalies exist, pass the data to AlertAgent to send an email alert.
3. Report what was done.
""",
sub_agents=[data_agent, alert_agent],
)
session_service = InMemorySessionService()
await session_service.create_session(app_name="monitor", user_id="u1", session_id="s1")
runner = Runner(agent=supervisor, app_name="monitor", session_service=session_service)
content = types.Content(role="user", parts=[types.Part(text="Run the daily anomaly check.")])
async for event in runner.run_async(user_id="u1", session_id="s1", new_message=content):
if event.is_final_response():
print(event.content.parts[0].text)
await bq_stack.aclose()
await notify_stack.aclose()
asyncio.run(main())
Architecture Summary
- Supervisor delegates data retrieval to DataAgent (BigQuery MCP)
- Supervisor delegates alerting to AlertAgent (Notification MCP)
- Each agent has its own MCPToolset connected to a different server
- Two MCP processes run simultaneously via separate stdio connections