Introducing FabricFlow: Code-First Data Pipelines in Microsoft Fabric

PL

Parth Lad

5 min read

Microsoft
microsoftfabric
Python
data
Open Source
Cover Image for Introducing FabricFlow: Code-First Data Pipelines in Microsoft Fabric

It’s been a while since my last post — I’ve been heads-down working on a few personal projects. One of them is finally ready to share: FabricFlow v0.1.0.

I have been working with Microsoft Fabric since the day it launched. It has changed significantly over time. I remember when Data Pipelines were first released, the copy activity couldn’t connect to on-premises data sources. Later, support for on-prem sources was added, but dynamic parameters still didn’t work. Now, with the latest updates and REST APIs, I feel like there are so many new possibilities. That’s why I started this project—to replace my existing Data Pipelines, which often required numerous activities just to copy data from source systems and were prone to failure if any step encountered an error.

FabricFlow is my code-first approach to copy data from different sources. With the FabricFlow Python SDK, you don’t need to create or manage Data Pipelines using the Microsoft Fabric UI. It comes with ready-to-use pipeline templates that you can deploy directly to your workspace. I suggest using a separate workspace specifically for these templates. It also simplifies deployment — you can switch sources and destinations by just updating parameters in your notebook, without complex CI/CD setup.

Getting Started with FabricFlow

Now, let’s walk through a sample workflow using FabricFlow to deploy pipeline templates, and copy data in Microsoft Fabric.

What is FabricFlow?

FabricFlow is a code-first Python SDK that helps you build, manage, and automate Microsoft Fabric data pipelines, workspaces, and other key items—all through Python code using the Fabric REST API.

It lets you do things like:

  • Deploy data pipelines from prebuilt templates (e.g., SQL Server to Lakehouse Table)

  • Manage your Fabric workspaces and items

  • Use utilities for connection, capacity, and logging

Installation

Just install the package using pip:

pip install fabricflow

Now, let’s go step-by-step through how to use FabricFlow to copy data from a SQL Server source to a Lakehouse Table in Microsoft Fabric.

Import Required Libraries

import fabricflow as ff
from sempy.fabric import FabricRestClient

We import fabricflow and FabricRestClient from the sempy package, which is used to make REST API calls to Microsoft Fabric. Since sempy is maintained by Microsoft and comes with a reliable authentication setup, it made sense to build my code on top of their client.

Initialize Fabric Client

fabric_client = FabricRestClient()

This sets up the Fabric client, which is used to make all API calls in the background.

If you want to use a service principal for authentication, follow the steps shared by Sandeep Pawar here: Using Service Principal Authentication with FabricRestClient

Deploy Data Pipeline Templates

Before you can use the copy functionalities, you will need to deploy the necessary pipeline templates to your Fabric workspace. You can deploy all available templates or select specific ones from the list.

# Define workspace name where templates will be deployed
PIPELINE_WORKSPACE_NAME = "FabricFlow"

# Deploy all available data pipeline templates
for template in ff.DataPipelineTemplates:
    ff.create_data_pipeline(
        fabric_client,
        template,
        PIPELINE_WORKSPACE_NAME
    )

Copy Data

FabricFlow gives you two options for copying data:

  • One table at a time

  • Multiple tables in a single run

Choose what fits your case best.

If you need to create a new connection in Microsoft Fabric, you can find detailed steps in the official documentation: Data source management - Microsoft Fabric.

Option 1: Single Item Per Pipeline Run

This option is good when you want to run one table at a time and control each copy operation separately.

# Define pipeline template
PIPELINE_TEMPLATE = 
        ff.DataPipelineTemplates.COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE.value

# Define source details
SOURCE_CONNECTION_ID = "your-source-connection-id"
# If you prefer to use connection name, use 
# ff.resolve_connection_id(fabric_client, "your-connection-name") to get the ID.
SOURCE_DATABASE_NAME = "AdventureWorks2022"
SOURCE_QUERY = "SELECT * FROM [Sales].[SalesOrderHeader]"

# Define sink details
SINK_WORKSPACE = "your-sink-workspace-id-or-name"
SINK_LAKEHOUSE = "your-sink-lakehouse-id-or-name"
SINK_TABLE_NAME = "SalesOrderHeader"
SINK_SCHEMA_NAME = "dbo"
SINK_TABLE_ACTION = "Overwrite"

# Create CopyManager
copy = ff.CopyManager(
    fabric_client,
    PIPELINE_WORKSPACE_NAME, 
    # PIPELINE_WORKSPACE_NAME is defined in the "Deploy Data Pipeline Templates" section
    PIPELINE_TEMPLATE
)

# Define source
source = ff.SQLServerSource(
    source_connection_id=SOURCE_CONNECTION_ID,
    source_database_name=SOURCE_DATABASE_NAME,
    source_query=SOURCE_QUERY,
)

# Define sink
sink = ff.LakehouseTableSink(
    sink_workspace=SINK_WORKSPACE,
    sink_lakehouse=SINK_LAKEHOUSE,
    sink_table_name=SINK_TABLE_NAME,
    sink_schema_name=SINK_SCHEMA_NAME,
    sink_table_action=SINK_TABLE_ACTION,
)

# Run the pipeline
result = (
    copy
    .source(source)
    .sink(sink)
    .execute()
)

This will run the pipeline and copy just one table from SQL Server to your Lakehouse Table.

Option 2: Multiple Items Per Pipeline Run

This is helpful when you want to load many tables at once. It loops through each item in the list and runs them in the same pipeline.

# Define pipeline template
PIPELELINE_TEMPLATE = 
        ff.DataPipelineTemplates.COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH.value

# Define source and sink info
SOURCE_CONNECTION_ID = "your-source-connection-id"
# If you prefer to use connection name, use 
# ff.resolve_connection_id(fabric_client, "your-connection-name") to get the ID.
SOURCE_DATABASE_NAME = "AdventureWorks2022"
SINK_WORKSPACE = "your-sink-workspace-id-or-name"
SINK_LAKEHOUSE = "your-sink-lakehouse-id-or-name"

# Define multiple items to load
ITEMS_TO_LOAD = [
    {
        "source_schema_name": "Sales",
        "source_table_name": "SalesOrderHeader",
        "source_query": "SELECT * FROM [Sales].[SalesOrderHeader]",
        "sink_schema_name": "dbo",
        "sink_table_name": "SalesOrderHeader",
        "sink_table_action": "Overwrite",
        "load_type": "Incremental",
        "primary_key_columns": ["SalesOrderID"],
        "skip": False,
        "load_from_timestamp": None,
        "load_to_timestamp": None,
    },
    {
        "source_schema_name": "Production",
        "source_table_name": "Product",
        "source_query": "SELECT * FROM [Production].[Product]",
        "sink_schema_name": "dbo",
        "sink_table_name": "Product",
        "sink_table_action": "Overwrite",
        "load_type": "Full",
        "skip": False,
        "load_from_timestamp": None,
        "load_to_timestamp": None,
    },
    # Add more tables as needed
]

# Create CopyManager
copy = ff.CopyManager(
    fabric_client,
    PIPELINE_WORKSPACE_NAME, 
    # PIPELINE_WORKSPACE_NAME is defined in the "Deploy Data Pipeline Templates" section
    PIPELELINE_TEMPLATE
)

# Define source and sink
source = ff.SQLServerSource(
    source_connection_id=SOURCE_CONNECTION_ID,
    source_database_name=SOURCE_DATABASE_NAME,
)

sink = ff.LakehouseTableSink(
    sink_workspace=SINK_WORKSPACE,
    sink_lakehouse=SINK_LAKEHOUSE,
)

# Run the pipeline with all items
result = (
    copy
    .source(source)
    .sink(sink)
    .items(ITEMS_TO_LOAD)
    .execute()
)

This approach runs a single pipeline that copies multiple tables in one go—it’s faster and cleaner when dealing with multiple tables.

Conclusion

FabricFlow aims to provide a more streamlined and code-centric way to manage your data pipelines in Microsoft Fabric, offering flexibility and automation beyond the UI. I hope this introduction helps you get started with simplifying your data ingestion workflows. For all setup steps, including deploying templates in your Fabric workspaces, please refer to the FabricFlow README.

References

This post was drafted with the assistance of LLMs to enhance clarity and structure.

Written by

PL

Parth Lad

I'm a data analyst who loves finding insights from numbers and visualizing them. I write about Data Analytics, Data Engineering, Power BI, and DAX on Medium & Hashnode.

Follow me for more!✌️😉