Pipework#

class pypeworks.Pipework(processes: int = None, connections: list[Connection] = None, logger: Logger = None, ignore_errors: Sequence[BaseException] = None, **nodes: dict[str, Node])#

A Pipework is a directed acylic graph composed of nodes and connections. Each Node serves as a standalone processing unit that takes in data, transforms it, and outputs it. Connection objects move this data throughout the pipework, until data reaches the exit node.

A Pipework may be declared through sub-classing, allowing them to be templated:

class ExamplePipework(Pipework):

    @Pipework.connect(input = "enter")
    @Pipework.connect(output = "exit")
    def passthrough(self, x):
        return x

A Pipework may also be declared on the fly, allowing them to be dynamically instantiated:

Pipework(

    passthrough = Node(lambda x: x),

    connections = [
        Connection("enter"       , "passthrough"),
        Connection("passthrough" , "exit"       )
    ]
)

Furthermore, both instantiation methods may be mixed:

class ExamplePipework(Pipework):

    @Pipework.connect(input = "enter")
    def prepare(self, x):
        return str(x)

ExamplePipework(

    passthrough = Node(lambda x: x),

    connections = [
        Connection("prepare"     , "passthrough"),
        Connection("passthrough" , "exit"       )
    ]
)

Fundamentals#

__init__(processes: int = None, connections: list[Connection] = None, logger: Logger = None, ignore_errors: Sequence[BaseException] = None, **nodes: dict[str, Node])#

Instantiates a new Pipework.

Parameters#

procesess

The number of worker processes to use to operate the Pipework. By default this number is equal to the number of logical CPUs in the system.

connections

The connections between the nodes in the Pipework.

logger

Logger compatible with Python’s logging module, providing control over the registration of events occuring within the Pipework.

Added in version 0.2.0.

ignore_errors

Sequence of errors (and exceptions), which if raised, are ignored by the Pipework, allowing it to continue execution.

Added in version 0.2.0.

nodes

The nodes that make up the Pipework.

__call__(inputs: Iterator[T] | T = None) Iterator[R]#

Pushes data into the Pipework, returning a generator that may be used to retrieve data as processed by the pipework.

Tip

In case you want this method to return type hints, add annotations when instantiating the pipework itself:

Pipework[int, str](
    x # Shows as : int
) # Shows as -> str

Decorators#

static connect(input: str = None, output: str = None, where: Callable[[Any], bool] = None, greedy: bool = False)#

Decorator to assign the method of a Pipework-subclass as a node.

Parameters#

input

The node from which this node receives it input. Can only be defined if output has not been defined. Either input or output must be defined.

output

The node to which this node forwards its input. Can only be defined if input has not been defined. Either input or output must be defined.

where

A function that evaluates the input received, returning a boolean that states whether or not this input should be forwarded to the next node. For example:

@Pipework.connect(output = "process_str", where = lambda self, x: isinstance(x, str))
@Pipework.connect(output = "process_num", where = lambda self, x: isinstance(x, numbers.Number))
def example(self, _):

    yield 123              # Processed by 'process_num'
    yield "a"              # Processed by 'process_str'
    yield datetime.now()   # Not processed

Stating conditions using the where-argument has as benefit that data is not unnecessarily duplicated. Everytime data is forwarded in a Pipework, that data is copied as to ensure that different nodes, potentially operating in different threads do not modify the same data. The condition stated by where is checked before doing so, preventing duplication and reducing memory usage.

greedy

Determines whether data is only forwarded using this connection when able (i.e. when the conditions stated by where are met), or whether data is also forwarded via any subsequent connections attached to the sender.

Please note that greedy behaviour is affected by the order by which connections are declared. For example:

@Pipework.connect(output = "step3")
@Pipework.connect(output = "step2", where = lambda self, x: x < 3, greedy = True)
@Pipework.connect(output = "step1")
def step0(self, input):

    for i in range(0, 5):
        yield i

In the above example the numbers 1, 2, 3, 4 will all be forwarded to step1, whereas step3 will only receive the numbers 3 and 4, with the preceding numbers, 1 and 2, being processed by step2.

static join(groupby: list[str] = None, flatten: bool = False)#

Set-up node to join all received data together before processing it. For example:

def step0(self, _):

    yield 1
    yield 2
    yield 3

@Pipework.join
@Pipework.connect(input = "step0")
def step1(self, inputs):

    return sum(inputs) # Result: 6

Without the join decorator step1 would process each input generated by step0 separately, each attempt resulting in an error as the sum function cannot iterate over an integer. With the join decorator, the inputs from step0 are grouped in a list before being passed to step1, where the preceeding inputs are collectively processed, with a sum being run over the list.

Parameters#

groupby

Parameters by which to group the inputs for the other parameters. By default a join loads each parameter with a list consisting of one entry for each input received. These lists are indexed by the order in which the inputs were processed. Accordingly, for any sending node, all the data sent by that node may be found by accessing the arguments at the same index. By stating a groupby, a different logic is applied. Firstly, the parameters included in the groupby are deduplicated, so that each combination of the given arguments is unique. Secondly, the parameters not included in the groupby are loaded as lists of lists. Their outer lists’ indices corresponds with the indices of the parameters included in the groupby. Their inner lists’ meanwhile represent an aggregation of all the inputs sharing the same values for the parameters included in the groupby. Consider the following example:

@Pipework.connect(input = "enter")
def gen(self, _):
    yield from (random.randint(1, 100) for _ in range(0, 100))

@Pipework.connect(input = "gen")
def pow(self, x) -> Args[Param[int, "x"], Param[int, "y"]]:
    return (x, x ** 2)

@Pipework.connect(input = "pow")
@Pipework.connect(input = "exit")
@Pipework.join(groupby = ["x"]):
def final(self, x : list[int], y : list[list[int]]):
    return zip(x, next(z for ls in y for z in ls if z, None))
flatten

Flag that indicates whether any grouped input should be flattened or not. When input is flattened, a singular value is chosen for each non-grouping parameter. Accordingly, the example for groupby could also have been inplemented as follows:

@Pipework.connect(input = "enter")
def gen(self, _):
    yield from (random.randint(1, 100) for _ in range(0, 100))

@Pipework.connect(input = "gen")
def pow(self, x) -> Args[Param[int, "x"], Param[int, "y"]]:
    return x ** 2

@Pipework.connect(input = "pow")
@Pipework.connect(input = "exit")
@Pipework.join(groupby = ["x"], flatten = True):
def final(self, x : list[int], y : list[int]):
    return zip(x, y)

Notes#

  • Joins are only carried out when all data in the pipeline has reached or passed the node set-up to join data. If any data is still being processed by any previous nodes, processing by this node is delayed. This also includes data fed to the enter node, meaning if data is streamed into the pipework, joins are delayed until all data had been put into the pipework. Due to this joining is incompatible with infinite generators.

static mixin(node: Node)#

Replaces the decorated method with the given Node, allowing to mix in runtime-defined nodes in class-defined pipeworks:

sqrt = Node(
    lambda x: math.sqrt(x)
)

class Example(Pipework):

    @Pipework.connect(input = "enter")
    def pow(self, x : int)
        return x ** 2

    @Pipework.connect(input = "mul")
    @Pipework.connect(output = "exit")
    @Pipework.mixin(sqrt)
    def sqrt(): 
        pass