Introducing FabricFlow: Code-First Data Pipelines in Microsoft Fabric
Parth Lad
5 min read

It’s been a while since my last post — I’ve been heads-down working on a few personal projects. One of them is finally ready to share: FabricFlow v0.1.0.
I have been working with Microsoft Fabric since the day it launched. It has changed significantly over time. I remember when Data Pipelines were first released, the copy activity couldn’t connect to on-premises data sources. Later, support for on-prem sources was added, but dynamic parameters still didn’t work. Now, with the latest updates and REST APIs, I feel like there are so many new possibilities. That’s why I started this project—to replace my existing Data Pipelines, which often required numerous activities just to copy data from source systems and were prone to failure if any step encountered an error.
FabricFlow is my code-first approach to copy data from different sources. With the FabricFlow Python SDK, you don’t need to create or manage Data Pipelines using the Microsoft Fabric UI. It comes with ready-to-use pipeline templates that you can deploy directly to your workspace. I suggest using a separate workspace specifically for these templates. It also simplifies deployment — you can switch sources and destinations by just updating parameters in your notebook, without complex CI/CD setup.
Getting Started with FabricFlow
Now, let’s walk through a sample workflow using FabricFlow to deploy pipeline templates, and copy data in Microsoft Fabric.
What is FabricFlow?
FabricFlow is a code-first Python SDK that helps you build, manage, and automate Microsoft Fabric data pipelines, workspaces, and other key items—all through Python code using the Fabric REST API.
It lets you do things like:
Deploy data pipelines from prebuilt templates (e.g., SQL Server to Lakehouse Table)
Manage your Fabric workspaces and items
Use utilities for connection, capacity, and logging
Installation
Just install the package using pip:
pip install fabricflow
Now, let’s go step-by-step through how to use FabricFlow to copy data from a SQL Server source to a Lakehouse Table in Microsoft Fabric.
Import Required Libraries
import fabricflow as ff
from sempy.fabric import FabricRestClient
We import fabricflow
and FabricRestClient
from the sempy
package, which is used to make REST API calls to Microsoft Fabric. Since sempy
is maintained by Microsoft and comes with a reliable authentication setup, it made sense to build my code on top of their client.
Initialize Fabric Client
fabric_client = FabricRestClient()
This sets up the Fabric client, which is used to make all API calls in the background.
If you want to use a service principal for authentication, follow the steps shared by Sandeep Pawar here: Using Service Principal Authentication with FabricRestClient
Deploy Data Pipeline Templates
Before you can use the copy functionalities, you will need to deploy the necessary pipeline templates to your Fabric workspace. You can deploy all available templates or select specific ones from the list.
# Define workspace name where templates will be deployed
PIPELINE_WORKSPACE_NAME = "FabricFlow"
# Deploy all available data pipeline templates
for template in ff.DataPipelineTemplates:
ff.create_data_pipeline(
fabric_client,
template,
PIPELINE_WORKSPACE_NAME
)
Copy Data
FabricFlow gives you two options for copying data:
One table at a time
Multiple tables in a single run
Choose what fits your case best.
If you need to create a new connection in Microsoft Fabric, you can find detailed steps in the official documentation: Data source management - Microsoft Fabric.
Option 1: Single Item Per Pipeline Run
This option is good when you want to run one table at a time and control each copy operation separately.
# Define pipeline template
PIPELINE_TEMPLATE =
ff.DataPipelineTemplates.COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE.value
# Define source details
SOURCE_CONNECTION_ID = "your-source-connection-id"
# If you prefer to use connection name, use
# ff.resolve_connection_id(fabric_client, "your-connection-name") to get the ID.
SOURCE_DATABASE_NAME = "AdventureWorks2022"
SOURCE_QUERY = "SELECT * FROM [Sales].[SalesOrderHeader]"
# Define sink details
SINK_WORKSPACE = "your-sink-workspace-id-or-name"
SINK_LAKEHOUSE = "your-sink-lakehouse-id-or-name"
SINK_TABLE_NAME = "SalesOrderHeader"
SINK_SCHEMA_NAME = "dbo"
SINK_TABLE_ACTION = "Overwrite"
# Create CopyManager
copy = ff.CopyManager(
fabric_client,
PIPELINE_WORKSPACE_NAME,
# PIPELINE_WORKSPACE_NAME is defined in the "Deploy Data Pipeline Templates" section
PIPELINE_TEMPLATE
)
# Define source
source = ff.SQLServerSource(
source_connection_id=SOURCE_CONNECTION_ID,
source_database_name=SOURCE_DATABASE_NAME,
source_query=SOURCE_QUERY,
)
# Define sink
sink = ff.LakehouseTableSink(
sink_workspace=SINK_WORKSPACE,
sink_lakehouse=SINK_LAKEHOUSE,
sink_table_name=SINK_TABLE_NAME,
sink_schema_name=SINK_SCHEMA_NAME,
sink_table_action=SINK_TABLE_ACTION,
)
# Run the pipeline
result = (
copy
.source(source)
.sink(sink)
.execute()
)
This will run the pipeline and copy just one table from SQL Server to your Lakehouse Table.
Option 2: Multiple Items Per Pipeline Run
This is helpful when you want to load many tables at once. It loops through each item in the list and runs them in the same pipeline.
# Define pipeline template
PIPELELINE_TEMPLATE =
ff.DataPipelineTemplates.COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH.value
# Define source and sink info
SOURCE_CONNECTION_ID = "your-source-connection-id"
# If you prefer to use connection name, use
# ff.resolve_connection_id(fabric_client, "your-connection-name") to get the ID.
SOURCE_DATABASE_NAME = "AdventureWorks2022"
SINK_WORKSPACE = "your-sink-workspace-id-or-name"
SINK_LAKEHOUSE = "your-sink-lakehouse-id-or-name"
# Define multiple items to load
ITEMS_TO_LOAD = [
{
"source_schema_name": "Sales",
"source_table_name": "SalesOrderHeader",
"source_query": "SELECT * FROM [Sales].[SalesOrderHeader]",
"sink_schema_name": "dbo",
"sink_table_name": "SalesOrderHeader",
"sink_table_action": "Overwrite",
"load_type": "Incremental",
"primary_key_columns": ["SalesOrderID"],
"skip": False,
"load_from_timestamp": None,
"load_to_timestamp": None,
},
{
"source_schema_name": "Production",
"source_table_name": "Product",
"source_query": "SELECT * FROM [Production].[Product]",
"sink_schema_name": "dbo",
"sink_table_name": "Product",
"sink_table_action": "Overwrite",
"load_type": "Full",
"skip": False,
"load_from_timestamp": None,
"load_to_timestamp": None,
},
# Add more tables as needed
]
# Create CopyManager
copy = ff.CopyManager(
fabric_client,
PIPELINE_WORKSPACE_NAME,
# PIPELINE_WORKSPACE_NAME is defined in the "Deploy Data Pipeline Templates" section
PIPELELINE_TEMPLATE
)
# Define source and sink
source = ff.SQLServerSource(
source_connection_id=SOURCE_CONNECTION_ID,
source_database_name=SOURCE_DATABASE_NAME,
)
sink = ff.LakehouseTableSink(
sink_workspace=SINK_WORKSPACE,
sink_lakehouse=SINK_LAKEHOUSE,
)
# Run the pipeline with all items
result = (
copy
.source(source)
.sink(sink)
.items(ITEMS_TO_LOAD)
.execute()
)
This approach runs a single pipeline that copies multiple tables in one go—it’s faster and cleaner when dealing with multiple tables.
Conclusion
FabricFlow aims to provide a more streamlined and code-centric way to manage your data pipelines in Microsoft Fabric, offering flexibility and automation beyond the UI. I hope this introduction helps you get started with simplifying your data ingestion workflows. For all setup steps, including deploying templates in your Fabric workspaces, please refer to the FabricFlow README.
References
This post was drafted with the assistance of LLMs to enhance clarity and structure.
Written by
Parth Lad
I'm a data analyst who loves finding insights from numbers and visualizing them. I write about Data Analytics, Data Engineering, Power BI, and DAX on Medium & Hashnode.
Follow me for more!✌️😉