AWS Big Data Blog

Orchestrate data processing jobs, querybooks, and notebooks using visual workflow experience in Amazon SageMaker

Automation of data processing and data integration tasks and queries is essential for data engineers and analysts to maintain up-to-date data pipelines and reports. Amazon SageMaker Unified Studio is a single data and AI development environment where you can find and access the data in your organization and act on it using the ideal tools for your use case. SageMaker Unified Studio offers multiple ways to integrate with data through the Visual ETL, Query Editor, and JupyterLab builders. SageMaker is natively integrated with Apache Airflow and Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and is used to automate the workflow orchestration for jobs, querybooks, and notebooks with a Python-based DAG definition.

Today, we are excited to launch a new visual workflows builder in SageMaker Unified Studio. With the new visual workflow experience, you don’t need to code the Python DAGs manually. Instead, you can visually define the orchestration workflow in SageMaker Unified Studio, and the visual definition is automatically converted to a Python DAG definition that is supported in Airflow. This post demonstrates the new visual workflow experience in SageMaker Unified Studio.

Example use case

In this post, a fictional ecommerce company sells many different products, like books, toys, and jewelry. Customers can leave reviews and star ratings for each product so other customers can make informed decisions about what they should buy. We use a sample synthetic review dataset for demonstration purposes, which includes different products and customer reviews.In this example, we demonstrate the new visual workflow experience with a data processing job, SQL querybook, and notebook. We also identify the top 10 customers who have contributed the most helpful votes per category.The following diagram illustrates the solution architecture.

In the following sections, we show how to configure a series of components using data processing jobs, querybooks, and notebooks with SageMaker Unified Studio visual workflows. You can use sample data to extract information from the specific category, update partition metadata, and display query results in the notebook using Python code.

Prerequisites

To get started, you must have the following prerequisites:

  • An AWS account
  • A SageMaker Unified Studio domain. To use the sample data provided in this blog post, your domain should be in us-east-1 region.
  • A SageMaker Unified Studio project with the Data analytics and AI-ML model development project profile
  • A workflow environment

Create a data processing job

The first step is to create a data processing job to run visual transformations to identify top contributing customers per category. Complete the following steps to create a data processing job:

  1. On the top menu, under Build, choose Visual ETL flow.
  2. Choose the plus sign, and under Data sources, choose Amazon S3.
  3. Choose the Amazon S3 source node and enter the following values:
    1. S3 URI: s3://aws-bigdata-blog/generated_synthetic_reviews/data/
    2. Format: Parquet
  4. Choose Update node.
  5. Choose the plus sign, and under Transform, choose Filter.
  6. Choose the Filter node and enter the following values:
    1. Filter Type: Global AND
    2. Key: product_category
    3. Operation: ==
    4. Value: Books
  7. Choose Update node.
  8. Choose the plus sign, and under Data targets, choose Amazon S3.
  9. Choose the S3 node and enter the following values:
    1. S3 URI: Use the Amazon S3 location from the project overview page and add the suffix /data/books_synthetic_reviews/ (for example, /dzd_al0ii4pi2sqv68/awi0lzjswu0yhc/dev/data/books_synthetic_reviews/)
    2. Format: Parquet
    3. Compression: Snappy
    4. Partition keys: marketplace
    5. Mode: Overwrite
    6. Update Catalog: True
    7. Database: Choose your database
    8. Table: books_synthetic_review
    9. Include header: False
  10. Choose Update node.

At this point, you should have an end-to-end visual flow. Now you can publish it.

  1. Choose Save to project to save the draft flow.
  2. Change Job name to filter-books-synthetic-review, then choose Update.

The data processing job has been successfully created.

Create a querybook

Complete the following steps to create a querybook to run a SQL query against the source table to recognize partitions:

  1. Choose the plus sign next to the querybook tab to open new querybook.
  2. Enter the following query and choose Save to project. The query MSCK REPAIR TABLE is prepared for recognizing partitions in the table. We don’t run this querybook yet because the querybook is designed to be triggered by a workflow.

MSCK REPAIR TABLE `books_synthetic_review`;

  1. For Querybook title, enter QueryBook-synthetic-review-<timestamp>, then choose Save changes.

The querybook to recognize new partitions has been successfully created.

Create a notebook

Next, we create notebook to generate output and visualize the results. Complete following steps:

  1. On the top menu, under Build, choose JupyterLab.
  2. Choose File, New, and Notebook to create a new notebook.
  3. Enter the following code snippets into notebook cells and save them (provide your AWS account ID, AWS Region, and S3 bucket):
import sys
!{sys.executable} -m pip install PyAthena
from sagemaker_studio import Project
from pyathena import connect
import pandas as pd

project = Project()
s3_path = f'{project.s3.root}/sys/athena/'
region = project.connection().physical_endpoints[0].aws_region
database = project.connection().catalog().databases[0].name

conn = connect(s3_staging_dir=s3_path, region_name=region)

print("Top 10 most helpful commented customer, Books category")
df = pd.read_sql(f"""
select customer_id, sum(helpful_votes) helpful_votes_sum from {database}.books_synthetic_review group by customer_id order by sum(helpful_votes) desc limit 10;
""", conn)
df
  1. Choose File, Save Notebook.

  1. Rename the file name, and choose Rename and Save.
  2. Choose the Git sidebar and choose the plus sign next to the file name.

  1. Enter the commit message and choose COMMIT.
  2. Choose Push to Remote.

Create a workflow

Complete the following steps to create a workflow:

  1. On the top menu, under Build, choose Workflows.
  2. Choose Create new workflow.

  1. Choose the plus sign, then choose Data processing job.

  1. Choose the Data processing job node, then choose Browse jobs.
  2. Select filter-books-synthetic-review and choose Select.

  1. Choose the plus sign, then choose Querybook.
  2. Choose the Querybook node, then choose Browse files.
  3. Select QueryBook-synthetic-review-<timestamp>.sqlnb and choose Select.
  4. Choose the plus sign, then choose Notebook.
  5. Choose the Notebook node, then choose Browse files.
  6. Select synthetics-review-result.ipynb and choose Select.

At this point, you should have an end-to-end visual workflow. Now you can publish it.

  1. Choose Save to project to save the draft flow.
  2. Change Workflow name to synthetic-review-workflow and choose Save to project.

Run the workflow

To run your workflow, complete following steps:

  1. Choose Run on the workflow details page.

  1. Choose View runs to see the running workflow.

When the run is complete, you can check the notebook task result by choosing the run ID (manual__<timestamp>), then choose the notebook task ID (notebook-task-xxxx).

You can find the IDs of the top 10 customers who have contributed the most helpful votes in the notebook output.

Clean up

To avoid incurring future charges, clean up the resources you created during this walkthrough:

  1. On the workflows page, select your workflow, and under Actions, choose Delete workflow.

  1. On the Visual ETL flows page, select filter-books-synthetics-review, and under Actions, choose Delete flow.
  2. In Query Editor, enter and run the following SQL to drop table:
DROP TABLE `books_synthetic_review`;
  1. In JupyterLab, in the File Browser sidebar, choose (right-click) each notebook (synthetics-review-result.ipynb and QueryBook-synthetic-review-<timestamp>.sqlnb) and choose Delete.
  2. Commit with git and then push to the remote repository.

Conclusion

The new visual workflow editor in SageMaker Unified Studio can help you orchestrate your data integration tasks visually without requiring deep expertise in Airflow. Through the visual interface, data engineers and analysts can focus on their core tasks instead of spending time on manual workflow Python DAG code implementation.Visual workflows offer several advantages, including an intuitive visual interface for workflow design and automatic conversion of visual workflows to Python DAG definitions. The integration with Airflow and Amazon MWAA further enhances the utility, and improved monitoring capabilities provide greater visibility into workflow runs. These features contribute to reduced development time in workflow creation. Visual workflows make workflow automation easy for a variety of use cases, such as data engineers orchestrating complex ETL pipelines or analysts maintaining regular reports.We encourage you to explore visual workflows in SageMaker Unified Studio, and discover how they can streamline your data processing and analytics workflows. For more information about SageMaker Unified Studio and its features, see AWS documentation.


About the authors

Naohisa Takahashi is a Senior Cloud Support Engineer on the AWS Support Engineering team. He supports customers resolve technical issues and launch systems. In his spare time, he plays board games with his friends.

Noritaka Sekiyama is a Principal Big Data Architect with AWS Analytics services. He’s responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Iris Tian is a UX designer on the Amazon SageMaker Unified Studio team. She designs intuitive, end-to-end experiences that simplify and streamline workflows across data processing and orchestration. In her spare time, she enjoys snowboarding and visiting museums.

Regan Baum is a Senior Software Development Engineer on the Amazon SageMaker Unified Studio team. She designs, implements, and maintains features that enable customers to manage their workflows in SageMaker Unified Studio. Outside of work, she enjoys hiking and running.

Yuhang Huang is a Software Development Manager on the Amazon SageMaker Unified Studio team. He leads the engineering team to design, build, and operate scheduling and orchestration capabilities in SageMaker Unified Studio. In his free time, he enjoys playing tennis.

Gal Heyne is a Senior Technical Product Manager for AWS Analytics services with a strong focus on AI/ML and data engineering. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design simple-to-use data products.