PipeLayer 0.7.0
A new event step_end
was added in v0.7.0. It passes the manifest_entry
and data
in the event args.
This example shows how the step_end
event could be used to log the manifest_entry
for each step in a pipeline:
import logging
from pipelayer import Filter, Pipeline, PipelineEventArgs
logger = logging.getLogger()
class MyFilter(Filter):
def run(self, data, context):
return f"{data} has been modified"
# Handles the pipeline step_end event
def my_pipeline_step_end(obj: Pipeline, args: PipelineEventArgs):
logger.info(args.manifest_entry)
my_filter = MyFilter()
my_pipeline = Pipeline(steps=[
my_filter
])
my_pipeline.step_end += my_pipeline_step_end
output = my_pipeline.run("Data", None)
# output is "Data has been modified"