tl  tr
  Home | Tutorials | Articles | Videos | Products | Tools | Search
Interviews | Open Source | Tag Cloud | Follow Us | Bookmark | Contact   
 Agentic AI > MCP Protocol > ADK Multi-Agent with BigQuery and Notification MCP

ADK Multi-Agent with BigQuery and Notification MCP

Author: Venkata Sudhakar

ADK Multi-Agent with BigQuery and Notification MCP

This tutorial builds a two-agent ADK system. A DataAgent queries BigQuery through an MCP server to find anomalies. An AlertAgent sends notifications through a second MCP server when anomalies exceed thresholds. A supervisor agent coordinates the two.

BigQuery MCP Server

# bq_server.py
from mcp.server.fastmcp import FastMCP
from google.cloud import bigquery
import os

mcp = FastMCP("bq-monitor")
client = bigquery.Client(project=os.environ["GCP_PROJECT_ID"])

@mcp.tool()
def get_daily_anomalies(threshold_pct: float = 20.0) -> dict:
    """Find product categories where today's revenue dropped more than threshold_pct vs 7-day avg."""
    sql = """
        WITH avg_7d AS (
            SELECT product_category,
                   AVG(daily_rev) AS avg_rev
            FROM (
                SELECT product_category,
                       DATE(order_date) AS d,
                       SUM(amount_rs) AS daily_rev
                FROM `shopmax.orders`
                WHERE order_date BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY)
                                     AND DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
                GROUP BY product_category, d
            )
            GROUP BY product_category
        ),
        today AS (
            SELECT product_category, SUM(amount_rs) AS today_rev
            FROM `shopmax.orders`
            WHERE DATE(order_date) = CURRENT_DATE()
            GROUP BY product_category
        )
        SELECT t.product_category,
               t.today_rev,
               a.avg_rev,
               ROUND((t.today_rev - a.avg_rev) / a.avg_rev * 100, 1) AS change_pct
        FROM today t JOIN avg_7d a USING (product_category)
        WHERE (t.today_rev - a.avg_rev) / a.avg_rev * 100 < -@threshold
        ORDER BY change_pct
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("threshold", "FLOAT64", threshold_pct)]
    )
    rows = client.query(sql, job_config=job_config).result()
    return {"anomalies": [dict(r) for r in rows]}

if __name__ == "__main__":
    mcp.run(transport="stdio")

Notification MCP Server

# notify_server.py
from mcp.server.fastmcp import FastMCP
import smtplib
from email.mime.text import MIMEText
import os

mcp = FastMCP("notifier")

@mcp.tool()
def send_alert(subject: str, body: str, recipient: str) -> dict:
    """Send an email alert."""
    msg = MIMEText(body)
    msg["Subject"] = subject
    msg["From"] = os.environ["ALERT_FROM_EMAIL"]
    msg["To"] = recipient
    with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
        smtp.login(os.environ["ALERT_FROM_EMAIL"], os.environ["ALERT_EMAIL_PASSWORD"])
        smtp.send_message(msg)
    return {"status": "sent", "recipient": recipient}

if __name__ == "__main__":
    mcp.run(transport="stdio")

Multi-Agent Coordinator

# coordinator.py
import asyncio
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types

async def main():
    bq_tools, bq_stack = await MCPToolset.from_server(
        connection_params=StdioServerParameters(command="python", args=["bq_server.py"])
    )
    notify_tools, notify_stack = await MCPToolset.from_server(
        connection_params=StdioServerParameters(command="python", args=["notify_server.py"])
    )

    data_agent = LlmAgent(
        model="gemini-2.0-flash",
        name="DataAgent",
        instruction="Use get_daily_anomalies to find revenue drops greater than 20%. Return the full list.",
        tools=bq_tools,
    )

    alert_agent = LlmAgent(
        model="gemini-2.0-flash",
        name="AlertAgent",
        instruction="You receive anomaly data. Use send_alert to email [email protected] with a summary.",
        tools=notify_tools,
    )

    supervisor = LlmAgent(
        model="gemini-2.0-flash",
        name="Supervisor",
        instruction="""
Coordinate the monitoring workflow:
1. Ask DataAgent to find anomalies.
2. If anomalies exist, pass the data to AlertAgent to send an email alert.
3. Report what was done.
""",
        sub_agents=[data_agent, alert_agent],
    )

    session_service = InMemorySessionService()
    await session_service.create_session(app_name="monitor", user_id="u1", session_id="s1")
    runner = Runner(agent=supervisor, app_name="monitor", session_service=session_service)

    content = types.Content(role="user", parts=[types.Part(text="Run the daily anomaly check.")])
    async for event in runner.run_async(user_id="u1", session_id="s1", new_message=content):
        if event.is_final_response():
            print(event.content.parts[0].text)

    await bq_stack.aclose()
    await notify_stack.aclose()

asyncio.run(main())

Architecture Summary

  • Supervisor delegates data retrieval to DataAgent (BigQuery MCP)
  • Supervisor delegates alerting to AlertAgent (Notification MCP)
  • Each agent has its own MCPToolset connected to a different server
  • Two MCP processes run simultaneously via separate stdio connections

 
  


  
bl  br