PyFlink is the Python API for Apache Flink which allows you to develop batch and stream data processing pipelines on modern distributed computing architectures. Apache Flink and associated PyFlink Python bindings expose a concise yet powerful relational API through the Table API and standard SQL. The Table API and SQL interfaces are engineered in a way that make them interchangable between the two and, they even play nicely with the more flexible DataStream API. In PyFlink these relational APIs also have integrations with popular tabular oriented libraries from the Python Data Stack such as Pandas which make it a great addition to the vast ecosystem of Python based big data toolsets.
Uses high-level programming language like Python to process and manipulate data using functional methods like select(...), group_by(...), filter(...) and others which are decomposed and optimized into lower-level DataStream Flink API implementations
Uses standard SQL to process data which gets decomposed and optimized into lower-level DataStream Flink API implementations
Both the Table API and SQL interface in Apache Flink use a common data structure, or abstraction around relational data, referred to as a Table. Tables are queried against as sources and return data structures which can then be operated on using both relational APIs as well as integrated with the DataStream API. The operations performed a Table are lazily evaluated and chainable. Tables are emitted to target sinks of write operations. Tables are uniquely identified by triplicate naming schema of <catalog_name>.<database_name>.<table_name>. Tables are defined within execution environments and cannot be accessed or joined from other execution environments.
There are two catagories of tables:
A metadata repository for storing data pertaining to databases, tables, partitions, views and functions.
In order to work with PyFlink and thus Apache Flink you'll need Java 8 or 11 (preferrably Java 11) along with Python 3.7 or 3.8 (preferrably 3.8 as that is what I'm using during this demonstration). For Java I recommend the JDK from either AdoptJDK or Azul
PyFlink is available on PyPI so, you can simply pip install it but, I recommend doing so in a project specific Python Virtual Environment.
Create Python Virtual Environment
python3 -m venv flink-env
Activate on MacOS or Linux
source flink-env/bin/activate
Activate on Windows
flink-env\Scripts\activate.bat
Then pip install PyFlink
(flink-env) pip install apache-flink
In this section I am going to briefly introduce the Table API and SQL interface in a batch processing example to calculate order sales by two locations based off input sales order data in CSV format.
Below is the sales order data file named dental-hygiene-orders.csv
seller_id,product,quantity,product_price,sales_date
LNK,Toothbrush,22,3.99,2021-07-01
LNK,Dental Floss,17,1.99,2021-07-01
LNK,Toothpaste,8,4.99,2021-07-01
OMA,Toothbrush,29,3.99,2021-07-01
OMA,Toothpaste,9,4.99,2021-07-01
OMA,Dental Floss,23,1.99,2021-07-01
LNK,Toothbrush,25,3.99,2021-07-02
LNK,Dental Floss,16,1.99,2021-07-02
LNK,Toothpaste,9,4.99,2021-07-02
OMA,Toothbrush,32,3.99,2021-07-02
OMA,Toothpaste,13,4.99,2021-07-02
OMA,Dental Floss,18,1.99,2021-07-02
LNK,Toothbrush,20,3.99,2021-07-03
LNK,Dental Floss,15,1.99,2021-07-03
LNK,Toothpaste,11,4.99,2021-07-03
OMA,Toothbrush,31,3.99,2021-07-03
OMA,Toothpaste,10,4.99,2021-07-03
OMA,Dental Floss,21,1.99,2021-07-03
First I will demonstrate the Table API. The below code is saved to a file named locale_totals_tableapi.py
from pyflink.table import (
DataTypes, TableEnvironment, EnvironmentSettings,
CsvTableSource, CsvTableSink, WriteMode
)
def main():
env_settings = EnvironmentSettings.new_instance()\
.in_batch_mode()\
.use_blink_planner()\
.build()
tbl_env = TableEnvironment.create(env_settings)
in_field_names = ['seller_id', 'product', 'quantity', 'product_price', 'sales_date']
in_field_types = [DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DATE()]
source = CsvTableSource(
'./input',
in_field_names,
in_field_types,
ignore_first_line=True
)
tbl_env.register_table_source('locale_product_sales', source)
out_field_names = ['seller_id', 'revenue']
out_field_types = [DataTypes.STRING(), DataTypes.DOUBLE()]
sink = CsvTableSink(
out_field_names,
out_field_types,
'./table-api-output/revenue.csv',
num_files=1,
write_mode=WriteMode.OVERWRITE
)
tbl_env.register_table_sink('locale_revenue', sink)
tbl = tbl_env.from_path('locale_product_sales')
sales_tbl = tbl.add_columns(
(tbl.quantity * tbl.product_price).alias('sales')
)
print('\nIntermediate Product Sales Schema')
sales_tbl.print_schema()
output_tbl = sales_tbl.group_by(sales_tbl.seller_id)\
.select(sales_tbl.seller_id,
sales_tbl.sales.sum.alias('revenue'))
print('\nLocale Revenue Schema')
output_tbl.print_schema()
output_tbl.execute_insert('locale_revenue').wait()
if __name__ == '__main__':
main()
Running the Flink Table API program is the same as running any other Python program. Issue the following with the active Python virtual environment.
python locale_totals_tableapi.py
The above program creates a TableEnvironment in batch processing mode using the Blink query planner. The TableEnvironment is the primary interface that our programs interact with to submit programming logic to the Apache Flink distributed runtime which parallelizes computational work across different cores and machines (depending on how Flink is configured and deployed).
Next the program registers a CSV Table source and sink with the TableEnvironment. Sources and Sinks are the conduits through which Flink's Table API (and SQL) interacts with data in external systems.
Having registered the source and sink table definitions a Table object is constructed through the TableEnvironment.from_path(...) method which pulls data in from a source CSV file in a directory named input.
After that I add a column to the Table object representing the sales as the product quantity multiplied by the product price then I alias, or rename, the resulting derived column as sales. This operation, and all other operations on the Table object, is a non-mutational transformation which creates a new Table object with the newly derived column.
I then use the functional methods of the new Table object to apply additional operations to aggregate the sales revenue by seller. This is accomplished by grouping by seller via the group_by(...) table method then chaining a projection operation for the seller and sum of sales via the select(...) table method. The result of these chained operations produce yet another new Table object which is assigned to the variable output_tbl. The resultant table is then emitted to the sink table as a CSV output file within a directory named table-api-output shown below.
LNK,502.57
OMA,650.14
Now that I've show how the Table API can be used I pivot to showing the same program implemented using the SQL interface. It is worth noting up front that the results of SQL based operations return Table objects identical to that of the more functional based Table methods of the Table API. This allows us to mix and match the different programming paradigms which I find very flexible and powerful.
The below code uses SQL to implement the seller revenue logic shown previously and is saved to a file named locale_totals_sql.py
from pyflink.table import (
DataTypes, TableEnvironment, EnvironmentSettings,
CsvTableSource, CsvTableSink, WriteMode
)
def main():
env_settings = EnvironmentSettings.new_instance()\
.in_batch_mode()\
.use_blink_planner()\
.build()
tbl_env = TableEnvironment.create(env_settings)
in_field_names = ['seller_id', 'product', 'quantity', 'product_price', 'sales_date']
in_field_types = [DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DATE()]
source = CsvTableSource(
'./input',
in_field_names,
in_field_types,
ignore_first_line=True
)
tbl_env.register_table_source('locale_product_sales', source)
out_field_names = ['seller_id', 'revenue']
out_field_types = [DataTypes.STRING(), DataTypes.DOUBLE()]
sink = CsvTableSink(
out_field_names,
out_field_types,
'./sql-output/revenue.csv',
num_files=1,
write_mode=WriteMode.OVERWRITE
)
tbl_env.register_table_sink('locale_revenue', sink)
sql = """
SELECT t.seller_id AS seller_id, SUM(t.sales) AS revenue
FROM (
SELECT seller_id, product, quantity, product_price, sales_date,
quantity * product_price AS sales
FROM locale_product_sales
) t
GROUP BY t.seller_id
"""
output_tbl = tbl_env.sql_query(sql)
print('\nLocale Revenue Schema')
output_tbl.print_schema()
output_tbl.execute_insert('locale_revenue').wait()
if __name__ == '__main__':
main()
The code is nearly identical to the Table API version shown previously except that the seller revenue calculation is performed in standard SQL. The output this time is saved in a CSV file within a directory named sql-output but the results are the same.
Interested in learning more about Apache Flink's relation APIs including how to perform both batch and stream processing? Take my Python based course on Apache Flink's Table API and SQL where I explain how to harness the power of Flink's unified approach to batch and stateful streaming computations through a mixture of theory along with many practical examples and code walk-throughs.
In this short article I gave a quick and dirty introduction to the Python based implementation for doing relational batch processing with Apache Flink's Table API and SQL interface.
As always, thanks for reading and don't be shy about commenting or critiquing below.