tl  tr
  Home | Tutorials | Articles | Videos | Products | Tools | Search
Interviews | Open Source | Tag Cloud | Follow Us | Bookmark | Contact   
 Agentic AI > MCP Protocol > BigQuery Analytics Agent with MCP

BigQuery Analytics Agent with MCP

Author: Venkata Sudhakar

BigQuery Analytics Agent with MCP

In this tutorial we build a richer analytics agent that goes beyond single queries. The agent uses multiple MCP tools to compute trend analysis, compare periods, and generate a summary report from BigQuery data.

MCP Server with Analytics Tools

# bq_analytics_server.py
from mcp.server.fastmcp import FastMCP
from google.cloud import bigquery
import os
from datetime import date, timedelta

mcp = FastMCP("bq-analytics")
client = bigquery.Client(project=os.environ["GCP_PROJECT_ID"])

@mcp.tool()
def revenue_by_month(year: int) -> dict:
    """Return monthly revenue totals for a given year."""
    sql = """
        SELECT FORMAT_DATE('%Y-%m', order_date) AS month,
               SUM(amount_rs) AS revenue
        FROM `shopmax.orders`
        WHERE EXTRACT(YEAR FROM order_date) = @year
          AND status = 'COMPLETED'
        GROUP BY month
        ORDER BY month
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("year", "INT64", year)]
    )
    rows = client.query(sql, job_config=job_config).result()
    return {r["month"]: float(r["revenue"]) for r in rows}

@mcp.tool()
def category_growth(current_year: int) -> dict:
    """Compare category revenue between current year and previous year."""
    sql = """
        SELECT product_category,
               SUM(IF(EXTRACT(YEAR FROM order_date) = @cur, amount_rs, 0)) AS current_yr,
               SUM(IF(EXTRACT(YEAR FROM order_date) = @prev, amount_rs, 0)) AS prev_yr
        FROM `shopmax.orders`
        WHERE status = 'COMPLETED'
          AND EXTRACT(YEAR FROM order_date) IN (@cur, @prev)
        GROUP BY product_category
        ORDER BY current_yr DESC
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("cur", "INT64", current_year),
            bigquery.ScalarQueryParameter("prev", "INT64", current_year - 1),
        ]
    )
    rows = client.query(sql, job_config=job_config).result()
    result = []
    for r in rows:
        cur = float(r["current_yr"])
        prev = float(r["prev_yr"])
        growth = round((cur - prev) / prev * 100, 1) if prev else None
        result.append({"category": r["product_category"], "current_yr": cur,
                        "prev_yr": prev, "growth_pct": growth})
    return {"categories": result}

@mcp.tool()
def top_cities(limit: int = 5) -> dict:
    """Return top cities by total order count."""
    sql = """
        SELECT city, COUNT(*) AS orders
        FROM `shopmax.orders`
        GROUP BY city ORDER BY orders DESC LIMIT @limit
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("limit", "INT64", limit)]
    )
    rows = client.query(sql, job_config=job_config).result()
    return {"cities": [dict(r) for r in rows]}

if __name__ == "__main__":
    mcp.run(transport="stdio")

Analytics Agent

# analytics_agent.py
import asyncio
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types

async def main():
    tools, exit_stack = await MCPToolset.from_server(
        connection_params=StdioServerParameters(
            command="python", args=["bq_analytics_server.py"]
        )
    )

    agent = LlmAgent(
        model="gemini-2.0-flash",
        name="Analytics Agent",
        instruction="""
You are a business analytics agent. Use the available tools to answer questions
about sales trends, category performance, and city rankings.
When asked for a report, call all relevant tools and synthesize the findings.
""",
        tools=tools,
    )

    session_service = InMemorySessionService()
    await session_service.create_session(app_name="analytics", user_id="u1", session_id="s1")
    runner = Runner(agent=agent, app_name="analytics", session_service=session_service)

    prompt = "Generate a business performance report for 2024 including monthly trends, category growth vs 2023, and top cities."
    content = types.Content(role="user", parts=[types.Part(text=prompt)])

    async for event in runner.run_async(user_id="u1", session_id="s1", new_message=content):
        if event.is_final_response():
            print(event.content.parts[0].text)

    await exit_stack.aclose()

asyncio.run(main())

What the Agent Does

  1. Calls revenue_by_month(2024) to get monthly revenue trend
  2. Calls category_growth(2024) to compare 2024 vs 2023 by category
  3. Calls top_cities(5) to get city rankings
  4. Synthesizes all three results into a structured report

Extending the Server

Add more tools to the MCP server for deeper analytics - customer retention rate, cohort analysis, or product affinity. The agent automatically discovers and uses new tools without any code changes on the agent side.


 
  


  
bl  br