First things first: It’s been a little over a month since the first version of PipeLayer was published and to celebrate, it got a shiny new logo!
Events that were added to the Filter abstract class have been extended to the Pipeline and Switch by inheriting from the Filter abstract class. With the addition of these events, the pre/post process methods in the Filter class became somewhat redundant—removing them streamlined the pipeline runner.
Also, you may have noticed the assignment operator used to add event handlers. This was added in v0.5.1. Now handlers can be added in a variety of ways:
This release introduces Filter Events, and Pipeline Actions.
These events are raised by the Filter object:
start
exit
end
They can be handled by attaching a callable to each event. The start and end events are raised automatically, but the exit event is triggered by setting the actionin the event handler to either EXIT, or SKIP.
Like this:
from pipelayer import Filter, FilterEventArgs, Pipeline, Action
from pipelayer.filter import raise_events
class MyFilter(Filter):
@raise_events # Raises the events
def run(self, data, context):
return f"{data}has been changed"
# Handles the filter start event
def my_filter_start(obj: Filter, args: FilterEventArgs):
args.data = "Filter Skipped"
args.action = Action.SKIP # This will skip over the step
my_filter = MyFilter()
my_filter.start.append(my_filter_start)
my_pipeline = Pipeline(steps=[
my_filter
])
output = my_pipeline.run("Some data...", None)
# output is "Filter Skipped"
It’s that time. Another week, another new version. This one has only one visible addition: The Switch filter. It works (and reads) very much like it’s counterparts in C# and JavaScript:
switch = Switch(
CarColor, { # The Expression (implements Step), returns an enum
IsRed.IS_TRUE:
car_is_red, # A Step or Callable
IsRed.IS_FALSE:
lambda d, c: {"car": "Is not red."} # Lambda's work too!
}
)
Under the hood, I needed to make the step init functions public. I moved them out of the Pipeline and into a new StepHelper class—easy, they were already static functions!
It took a bit more code re-organization to clean up the API namespace. Interfaces for all objects were created to avoid some PITA circular references when the actual object wasn’t needed. That’s what I get for wanting to expose all the core classes in the root __init__.py
For the first version of the Switch filter, I decided to require the expression to return an Enum. It shouldn’t be too difficult to add some flexibility to to the expression return type.
Stay tuned, there’s some fun stuff coming out in the next version. 😎
The is only one notable change in 0.3.1: The pipelayer.Step base class was converted to an interface (a Protocol). This means for your implementation, all you need to do is create a class that implements a run method with this signature:
class MyFilter:
def run(data: Any, context: Context) -> Any:
...
As I continued to work on the microservice example (using v0.3.0), I found I was not creating filters by sub-classing pipelayer.Filter. Doing so would have meant I had to create a separate class for each filter, and so any logical grouping was placed on the directory structure of the project and not the domain namespace. For example:
from service.config import AppContext
class UserRequest(Filter):
@staticmethod
def run(request: UserRequest, context: AppContext) -> dict:
req = (f"{context.settings.resreq_api}/"
f"{request.api_name}/{request.id}")
resreq_resp = requests.get(req)
user = json.loads(resreq_resp.content)
context.log.info("User received from ResReq")
return user
as opposed to this:
from service.config import AppContext
from service.model.resreq_model import ResReqModel
class MapResReq(Filter):
@staticmethod
def from_resreq_api_response(response: dict,
context: AppContext) -> ResReqModel:
resreq = ResReqModel.parse_obj(response)
return resreq
A subtle difference, but here’s how these filters would look in a pipeline:
Since class names are generally nouns and functions are verbs, I think using a custom static method reads more clearly.
Now, one of the benefits of implementing a sub-class of Filter is that can enforce a separation of concerns. It would be very easy to create a single class with every filter in your app using the static method approach.
NOTE: If your list of filters can change at runtime, implementing the pipelayer.Step interface (or sub-classing pipelayer.Filter) is the easiest way to implement a pipeline.
As soon as I’d published the last version to PyPi, I’d started on the next round of improvements. Both the Pipeline and Filter class had a run method (diff signatures though), so how hard would it be to nest a Pipeline to be added alongside the filters? It turns out, not nearly has difficult as I’d feared. In the Pipeline, I swapped the pipeline constructor and run methods, and created a new base class for Pipelines and Filters. Voila! I took another pass through at refactoring the run methods, removing the base Settings class (as it’s really an implementation concern).
Once the tests were passing it was time to create a “real-world” app. I got a connexion/flask microservice up and running fairly quickly with two endpoints.
Here’s what creating and running a simple pipeline looks like now:
# The pipeline
get_users_pipeline: Pipeline = Pipeline(
name="Get Users"
steps=[
query_users_pipeline, # <-- A nested pipeline!
MapResReq.from_resreq_list_api_response,
MapUser.from_resreq_list
]
)
# The api endpoint
def get_users(**kwargs):
users = get_users_pipeline.run(kwargs, context)
return users.dict()
And, one of the filters:
@staticmethod
def get_user(request: UserRequest, context: AppContext) -> dict:
req = (
f"{context.settings.resreq_api}/"
f"{request.api_name}/{request.id}"
)
resreq_resp = requests.get(req)
user = json.loads(resreq_resp.content)
context.log.info("User received from ResReq")
return user
As I built out each step in the pipeline, I found I had to rethink how to pass data (without any args) from one step to the next without having any knowledge of what the next step required (hint: make everything a model). For testing purposes, I could see how this new pattern would accelerate authoring tests, and help to adopt a TDD approach.
You’ll find the latest package at PyPi, or check out the GitHub repo which has the flask app in the “examples” directory.
StringBender is a small collection of case-conversion functions… well, actually it’s more than that. It’s a fluent class that inherits from the built-in str.
from stringbender import (
camel, kebob, pascal, snake,
String
)
# =============================================
# EXAMPLES # OUTPUT
s = "Hasta la vista baby"
print(camel(s)) # hastaLaVistaBaby
print(kebob(s) # hasta-la-vista-baby
print(pascal(s)) # HastaLaVistaBaby
print(snake(s)) # hasta_la_vista_baby
# =============================================
# A StringBender method with a str function
# Create an instance of stringbender.String:
s = String("vote*for*pedro")
# Check the default output:
print(s.camel())
# vote*For*Pedro (hmm... this isn't right)
# Pass in a custom delimiter:
print(s.replace("*", " ").camel())
# voteForPedro (Much better!)
# =============================================
# Using a list of delimiters
s = snake(
"Careful man, there's a beverage here!",
delimiters=[",", "'", "!"]
)
print(snake(s))
# careful_man_there_s_a_beverage_here
As I was creating some “real-world” scenarios to test, I realized that requiring filters to derive from the Filter class was not necessary. As long as the method signature for a static/module/lambda was the same, any function could easily be added to a pipeline. Stay tuned, version 0.3.0 was passing tests within an hour of releasing version 0.2.0.
After two years of working on a few microservices projects (mostly Python-based), with varying architectures and program flow, and after lengthy brainstorming discussions with one of the client engineers on my current project, I started to come up with a rough idea of a generic, reusable framework. Within a week after writing the first line of code, I had a working prototype, and quickly* published pipelayer 0.1.0, a lightweight pipeline framework based on the pipes/filters pattern.
Here’s a sample:
from app_context import AppContext
from app_settings import AppSettings
from pipelayer import Pipeline
from hello_filter import HelloFilter
from world_filter import WorldFilter
from logging import getLogger
app_settings = AppSettings()
app_context = AppContext(
app_settings,
getLogger(__name__)
)
pipeline = Pipeline.create(
app_context,
"Hello World Pipeline"
)
output = pipeline.run([
HelloFilter(),
WorldFilter()
])
print(f"Pipeline Output: {output}") print(pipeline.manifest.__dict__)
More documentation for this version is available on PyPi.
* quickly (adv): A relative term as it applies to first-time development of a Python package: hours of research, countless executions of setup.py, more research, a few runs of installing the package from the local wheel, and finally, success!
I’m not sure what this is or what it will become. For now, just a random page in the ether for me to share some stuff, most likely about Software Engineering topics. Python is my current jam–I thought I would hate it after 15 years of C#. While it’s a twisty maze of passages, I’m having a blast getting lost (and found).