GreaterThan, LLC

PipeLayer 0.7.0

A new event step_end was added in v0.7.0. It passes the manifest_entry and data in the event args.

This example shows how the step_end event could be used to log the manifest_entry for each step in a pipeline:

import logging
from pipelayer import Filter, Pipeline, PipelineEventArgs

logger = logging.getLogger()

class MyFilter(Filter):
    def run(self, data, context):
        return f"{data} has been modified"

# Handles the pipeline step_end event
def my_pipeline_step_end(obj: Pipeline, args: PipelineEventArgs):
    logger.info(args.manifest_entry)

my_filter = MyFilter()

my_pipeline = Pipeline(steps=[
    my_filter
])

my_pipeline.step_end += my_pipeline_step_end

output = my_pipeline.run("Data", None)

#  output is "Data has been modified"

in PipeLayer, PyPi, Python

Events All The Way Down

First things first: It’s been a little over a month since the first version of PipeLayer was published and to celebrate, it got a shiny new logo!

Events that were added to the Filter abstract class have been extended to the Pipeline and Switch by inheriting from the Filter abstract class. With the addition of these events, the pre/post process methods in the Filter class became somewhat redundant—removing them streamlined the pipeline runner.

from pipelayer import Filter, FilterEventArgs, Pipeline


class MyStep(Filter):
    def run(self, data = None, context = None):
        ...

def my_pipeline_start(o: Pipeline, e: FilterEventArgs):
    ...

my_pipeline = Pipeline([MyStep])
my_pipeline.start += my_pipeline_start

output = my_pipeline.run()

Also, you may have noticed the assignment operator used to add event handlers. This was added in v0.5.1. Now handlers can be added in a variety of ways:

my_filter.exit += my_event_handler
my_filter.exit.append(my_event_handler)
my_filter.exit = my_filter.exit + my_event_handler

in PipeLayer, PyPi, Python

PipeLayer 0.5.0

This release introduces Filter Events, and Pipeline Actions.

These events are raised by the Filter object:

  • start
  • exit
  • end

They can be handled by attaching a callable to each event. The start and end events are raised automatically, but the exit event is triggered by setting the action in the event handler to either EXIT, or SKIP.

Like this:

from pipelayer import Filter, FilterEventArgs, Pipeline, Action
from pipelayer.filter import raise_events

class MyFilter(Filter):
    @raise_events                # Raises the events
    def run(self, data, context):
        return f"{data}has been changed"

# Handles the filter start event
def my_filter_start(obj: Filter, args: FilterEventArgs):
    args.data = "Filter Skipped"
    args.action = Action.SKIP    # This will skip over the step

my_filter = MyFilter()
my_filter.start.append(my_filter_start)

my_pipeline = Pipeline(steps=[
    my_filter
])

output = my_pipeline.run("Some data...", None)

#  output is "Filter Skipped"

in PipeLayer, PyPi, Python

PipeLayer 0.4.1

It’s that time. Another week, another new version. This one has only one visible addition: The Switch filter. It works (and reads) very much like it’s counterparts in C# and JavaScript:

switch = Switch(
    CarColor, { # The Expression (implements Step), returns an enum
        IsRed.IS_TRUE:
            car_is_red,   # A Step or Callable
        IsRed.IS_FALSE:
            lambda d, c: {"car": "Is not red."} # Lambda's work too!
    }
)

Under the hood, I needed to make the step init functions public. I moved them out of the Pipeline and into a new StepHelper class—easy, they were already static functions!

It took a bit more code re-organization to clean up the API namespace. Interfaces for all objects were created to avoid some PITA circular references when the actual object wasn’t needed. That’s what I get for wanting to expose all the core classes in the root __init__.py

For the first version of the Switch filter, I decided to require the expression to return an Enum. It shouldn’t be too difficult to add some flexibility to to the expression return type.

Stay tuned, there’s some fun stuff coming out in the next version. 😎

in PipeLayer, PyPi, Python

PipeLayer 0.3.1

The is only one notable change in 0.3.1: The pipelayer.Step base class was converted to an interface (a Protocol). This means for your implementation, all you need to do is create a class that implements a run method with this signature:

class MyFilter:
    def run(data: Any, context: Context) -> Any:
        ...

As I continued to work on the microservice example (using v0.3.0), I found I was not creating filters by sub-classing pipelayer.Filter. Doing so would have meant I had to create a separate class for each filter, and so any logical grouping was placed on the directory structure of the project and not the domain namespace. For example:

from service.config import AppContext

class UserRequest(Filter):

    @staticmethod
    def run(request: UserRequest, context: AppContext) -> dict:
        req = (f"{context.settings.resreq_api}/"
               f"{request.api_name}/{request.id}")
        resreq_resp = requests.get(req)
        user = json.loads(resreq_resp.content)
        context.log.info("User received from ResReq")
        return user

as opposed to this:

from service.config import AppContext
from service.model.resreq_model import ResReqModel

class MapResReq(Filter):

    @staticmethod
    def from_resreq_api_response(response: dict, 
                                 context: AppContext) -> ResReqModel:
        resreq = ResReqModel.parse_obj(response)
        return resreq

A subtle difference, but here’s how these filters would look in a pipeline:

get_user_pipeline = Pipeline(
    name="Get User Pipeline",
    steps=[
        MapRequest.from_get_user_request,
        UserRequest,                         # pipelayer.Filter
        MapResReq.from_resreq_api_response,  # static method
        MapUser.from_resreq_model
    ]
)

Since class names are generally nouns and functions are verbs, I think using a custom static method reads more clearly.

Now, one of the benefits of implementing a sub-class of Filter is that can enforce a separation of concerns. It would be very easy to create a single class with every filter in your app using the static method approach.

NOTE: If your list of filters can change at runtime, implementing the pipelayer.Step interface (or sub-classing pipelayer.Filter) is the easiest way to implement a pipeline.

in PipeLayer, PyPi, Python

Pipelayer 0.3.0

As soon as I’d published the last version to PyPi, I’d started on the next round of improvements. Both the Pipeline and Filter class had a run method (diff signatures though), so how hard would it be to nest a Pipeline to be added alongside the filters? It turns out, not nearly has difficult as I’d feared. In the Pipeline, I swapped the pipeline constructor and run methods, and created a new base class for Pipelines and Filters. Voila! I took another pass through at refactoring the run methods, removing the base Settings class (as it’s really an implementation concern).

Fine, but Will It Blend?

Once the tests were passing it was time to create a “real-world” app. I got a connexion/flask microservice up and running fairly quickly with two endpoints.

Here’s what creating and running a simple pipeline looks like now:

# The pipeline
get_users_pipeline: Pipeline = Pipeline(
    name="Get Users"
    steps=[
        query_users_pipeline,  # <-- A nested pipeline!
        MapResReq.from_resreq_list_api_response,
        MapUser.from_resreq_list
    ]
)

# The api endpoint
def get_users(**kwargs):
    users = get_users_pipeline.run(kwargs, context)
    return users.dict()

And, one of the filters:

@staticmethod
def get_user(request: UserRequest, context: AppContext) -> dict:
    req = (
        f"{context.settings.resreq_api}/"
        f"{request.api_name}/{request.id}"
    )
    resreq_resp = requests.get(req)
    user = json.loads(resreq_resp.content)
    context.log.info("User received from ResReq")
    return user

As I built out each step in the pipeline, I found I had to rethink how to pass data (without any args) from one step to the next without having any knowledge of what the next step required (hint: make everything a model). For testing purposes, I could see how this new pattern would accelerate authoring tests, and help to adopt a TDD approach.

You’ll find the latest package at PyPi, or check out the GitHub repo which has the flask app in the “examples” directory.

in PipeLayer, PyPi, Python

StringBender 0.2.0

StringBender is a small collection of case-conversion functions… well, actually it’s more than that. It’s a fluent class that inherits from the built-in str.

from stringbender import (
    camel, kebob, pascal, snake, 
    String
)


# =============================================
# EXAMPLES                # OUTPUT

s = "Hasta la vista baby"
print(camel(s))           # hastaLaVistaBaby
print(kebob(s)            # hasta-la-vista-baby
print(pascal(s))          # HastaLaVistaBaby
print(snake(s))           # hasta_la_vista_baby

# =============================================
# A StringBender method with a str function

# Create an instance of stringbender.String:
s = String("vote*for*pedro")

# Check the default output:
print(s.camel())                     
# vote*For*Pedro (hmm... this isn't right)

# Pass in a custom delimiter:
print(s.replace("*", " ").camel())   
# voteForPedro (Much better!)


# =============================================
# Using a list of delimiters
s = snake(
    "Careful man, there's a beverage here!", 
    delimiters=[",", "'", "!"]
)
print(snake(s))                      
# careful_man_there_s_a_beverage_here

in PyPi, Python, StringBender Tags: , , , , .

PipeLayer 0.2.0

As I was creating some “real-world” scenarios to test, I realized that requiring filters to derive from the Filter class was not necessary. As long as the method signature for a static/module/lambda was the same, any function could easily be added to a pipeline. Stay tuned, version 0.3.0 was passing tests within an hour of releasing version 0.2.0.

in PipeLayer, PyPi, Python

PipeLayer 0.1.0, Published to PyPi

After two years of working on a few microservices projects (mostly Python-based), with varying architectures and program flow, and after lengthy brainstorming discussions with one of the client engineers on my current project, I started to come up with a rough idea of a generic, reusable framework. Within a week after writing the first line of code, I had a working prototype, and quickly* published pipelayer 0.1.0, a lightweight pipeline framework based on the pipes/filters pattern.

Here’s a sample:

from app_context import AppContext 
from app_settings import AppSettings 
from pipelayer import Pipeline 
from hello_filter import HelloFilter 
from world_filter import WorldFilter 
from logging import getLogger


app_settings = AppSettings()
app_context = AppContext(
    app_settings, 
    getLogger(__name__)
)
pipeline = Pipeline.create(
    app_context, 
    "Hello World Pipeline"
) 
output = pipeline.run([
     HelloFilter(),
     WorldFilter()
])
print(f"Pipeline Output: {output}") print(pipeline.manifest.__dict__)

More documentation for this version is available on PyPi.

* quickly (adv): A relative term as it applies to first-time development of a Python package: hours of research, countless executions of setup.py, more research, a few runs of installing the package from the local wheel, and finally, success!

in PipeLayer, PyPi, Python

Hello, world.

I’m not sure what this is or what it will become. For now, just a random page in the ether for me to share some stuff, most likely about Software Engineering topics. Python is my current jam–I thought I would hate it after 15 years of C#. While it’s a twisty maze of passages, I’m having a blast getting lost (and found).

You can also find me here:

Don’t mind the dust. I’m still getting re-acquainted with customizing WordPress themes. And, it’s pretty minimalist by design. Words are cool.

in Colossal Cave, Python