Detailed Project Description This project demonstrates an innovative multi-agent system built with the Agent Development Kit (ADK) that automates the Extract, Transform, Load (ETL) process for CSV data and subsequently generates SQL views for dashboard creation. The system intelligently routes user requests and orchestrates specialized agents to handle different stages of the data pipeline, ultimately delivering actionable insights.
Problem Statement: Data analysis often involves repetitive and complex tasks like data cleaning, type inference, and preparing data for visualization. Manual execution of these steps is time-consuming and prone to errors. This project aims to streamline this process by leveraging a multi-agent architecture.
Solution Overview: Our system employs a hierarchical multi-agent approach. A central main_coordinator_agent (Root Agent) acts as a router, directing user requests to specialized sub-agents. These sub-agents then collaboratively perform ETL operations, including data type identification, cleaning, and loading into a silver table. Finally, a data_view_agent generates analytical SQL views suitable for dashboarding.
Innovation: The key innovation lies in the intelligent orchestration of specialized agents for distinct tasks within the data pipeline. This modular approach allows for:
Scalability: New agents can be easily integrated for additional functionalities (e.g., advanced analytics, different data sources). Robustness: Each agent focuses on a specific problem domain, reducing the complexity of individual agents and improving overall system reliability. Automation: The system automates tedious data preparation steps, freeing up data analysts for higher-value tasks. Intuitive Data Handling: The data_cleaning_agent and data_type_agent demonstrate "intuitive data analysis" by inferring types and handling cleaning without explicit user-defined rules, making the process more adaptive. Agent Roles and Interactions:
main_coordinator_agent (DataRoutingAgent):
Role: Task Delegation Specialist. Objective: Routes incoming user requests to the appropriate specialized agents based on content analysis. Key Interactions: Receives user input (e.g., "name for this session," "analyze the data"). Delegates to session_naming_agent for session naming requests. Delegates to data_pipeline_agent for data analysis/processing requests. Utilizes google.adk.agents.BaseAgent for its foundation. session_naming_agent (LlmAgent):
Role: Naming Agent. Objective: Creates unique and descriptive session names for data processing jobs based on a subset of the 'bronze' table data. Key Interactions: Called by main_coordinator_agent. Uses the get_database_data_subset tool to obtain data for naming. Returns only the generated name. data_pipeline_agent (DataPipelineAgent):
Role: Orchestrates the ETL process. Objective: Manages the flow of data from raw ('bronze') to cleaned ('silver') and then to analytical views ('gold'). Key Interactions: Called by main_coordinator_agent. Initiates the process by getting 'bronze' data. Invokes data_type_agent to create the 'silver' table. Manages batch processing by calling data_cleaning_agent for each batch. Calls data_view_agent to create analytical views. Uses get_database_data_count, get_database_data, and self_execute_script tools. data_type_agent (LlmAgent):
Role: PostgreSQL Data Type Identification Specialist. Objective: Analyzes data samples from the 'bronze' table to infer optimal PostgreSQL column types and generates CREATE TABLE scripts for the {schema}.silver table. Key Interactions: Called by data_pipeline_agent. Receives bronze_data and schema from the session state. Infers data types based on a predefined hierarchy (BOOLEAN → INTEGER → BIGINT → NUMERIC → TEXT, DATE → TIMESTAMP → TEXT). Generates and executes DROP TABLE IF EXISTS and CREATE TABLE statements using the execute_script tool. Includes retry logic with type promotion on errors. data_cleaning_agent (LlmAgent):
Role: Intuitive Data Cleaning & SQL Generation Specialist. Objective: Cleans batch data from the batch_data state key and generates INSERT statements for {schema}.silver. Key Interactions: Called by data_pipeline_agent for each data batch. Receives schema and batch_data from the session state. Performs cleaning operations: trimming whitespace, handling nulls ('', 'NA', 'null' to NULL), escaping special characters, and applying type-specific cleaning (text truncation, numeric conversion, date formatting, boolean conversion). Generates raw SQL INSERT statements, processing up to 50 rows per batch. data_view_agent (LlmAgent):
Role: Business Intelligence Transformation Specialist. Objective: Converts cleaned data from {schema}.silver into actionable insights through KPI performance tracking, statistical anomaly detection, and visualization-ready aggregated views. Key Interactions: Called by data_pipeline_agent after data cleaning. Receives schema and cleaned_batch_data for analysis. Identifies primary KPIs, date columns, anomaly measures, and graph elements. Generates SQL CREATE OR REPLACE VIEW statements for: 3 KPI target views. 1 Anomaly detection view. 5 Graph-ready views. Executes these scripts using the execute_script tool. ADK Utilization: The project heavily leverages the Google Agent Development Kit through:
LlmAgent and BaseAgent: All specialized agents (session_naming_agent, data_type_agent, data_cleaning_agent, data_view_agent) are instances of LlmAgent, inheriting capabilities for interacting with large language models, while DataRoutingAgent and DataPipelineAgent extend BaseAgent for custom logic and orchestration. InvocationContext and Session State: Agents communicate and share data (like schema, bronze_data, batch_data, cleaned_batch_data) through the InvocationContext's session state, enabling a cohesive workflow. Tools: The agents integrate custom tools (e.g., get_database_data_count, get_database_data, self_execute_script, execute_script, get_database_data_subset) to interact with external systems like the database. Runner: The Runner orchestrates the execution of the root_agent (main coordinator), managing sessions and agent interactions. Model Integration: Agents use constants.MODEL_GEMINI_2_0_FLASH for their underlying LLM capabilities. Google Cloud Integration (Potential): While the provided code snippets directly use psycopg2 for PostgreSQL connectivity (which can run anywhere), the google.adk library is designed to seamlessly integrate with Google Cloud services. In a full deployment, the PostgreSQL database could reside on Cloud SQL, and the agents themselves could be deployed as Cloud Run services or within Google Kubernetes Engine, leveraging Vertex AI for the underlying Gemini models. The session storage is currently InMemorySessionService, but DatabaseSessionService (commented out in agent.py) could be used with a Cloud SQL instance.
Potential Impact/Future Work: This system can significantly reduce the manual effort in data analysis. Future work could include:
Support for more data sources: Expanding beyond CSV to include other formats like JSON, Parquet, or direct API integrations. User Interface: Developing a user-friendly interface for interacting with the system and visualizing the generated dashboards. Advanced Analytics Agents: Integrating agents for predictive modeling, machine learning, or more sophisticated statistical analysis. Automated Dashboard Generation: Direct integration with BI tools like Looker Studio or Power BI to automatically create dashboards from the generated views.
Built With
- c#
- cloud-run
- postgresql
- python
- rabbitmq
- react
Log in or sign up for Devpost to join the conversation.