Basics#

Pypeworks relies on a number of foundational concepts to build up its pipeworks. This chapter explains these concepts, and illustrates how they have been translated into code.

Nodes#

A Node is a processing unit, taking in data, transforming it, and returning it. In that sense a node is similar to a typical programming function. In fact, at its core, Pypeworks relies on functions to wrap a node’s inner logic, regardless of how nodes are declared, either on the fly:

passthrough = Node(
    lambda x: x
)

Or, as part of a class:

class ExamplePipework(Pipework):
    def passthrough(self, x):
        return x

Nodes may take multiple inputs, but require at least one input. Such inputs may be passed anonymously or by name, depending on the input requirements stated by the node:

  • Instance
  • Template
sum = Node(
    lambda x, y: x + y
)
def sum(self, x : int, y : int):
    return x + y

By default any input received is assigned to the first parameter. To assign data to any subsequent parameters, the source of the input needs to provide hints on what kind of input is being forwarded. A prior node may do so by type hinting the return values:

  • Instance
  • Template
Node(
    make_pairs = Node(
        lambda xs: zip(xs[::2], xs[1::2]),
        returns = Args[Param[int, "x"], Param[int, "y"]]
    )
)
def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
    yield from zip(xs[::2], xs[1::2])

As the above example reveals, that implicitly also means that a node may return multiple values simultaneously. This is similar to how a Python function may present multiple return values. There is one notable difference: a node must always name its return values when passing multiple outputs.

Tip

By default lambda functions do not allow for type hinting. When instantiating a Node, you may pass these type hints as an annotation, providing full support for type checking, even when working with lambda functions:

Node[list[int]](
    lambda xs: zip(xs[::2], xs[1::2]),
    returns = Args[Param[int, "x"], Param[int, "y"]]
)

Furthermore, the above example also shows that nodes may generate multiple outputs for the same input. Do note, however, that nodes are not generators. The output of a node is consumed eagerly. So, if a node produces an infinite generator, its input will be taken as long as subsequent nodes are able to process them within the limits of the machine’s memory.

Connections#

A Connection specifies the relation between two nodes. Each connection is defined by an input and a output. When nodes are defined on the fly, these connections need to be specified separately:

make_pairs = Node(
    lambda xs: zip(xs[::2], xs[1::2]),
    returns = Args[Param[int, "x"], Param[int, "y"]]
),

sum = Node(
    lambda x, y: x + y
),

connections = [
    Connection("make_pairs", "sum")
    # or: Connection(input = "make_pairs", output = "sum")
]

This is different when nodes are defined as part of a class. In this case, the declaration of either the input or the output needs to coincide with the declaration of the connection itself. As such, the input or output can be deferred from the specification of the opposite:

def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
    yield from zip(xs[::2], xs[1::2])

@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int):
    return x + y

In Pypeworks connections are not mutually exclusive. That is to say, a node may send its output to multiple nodes, and it may also receive input from multiple nodes. So, the following code is viable with Pypeworks:

  • Instance
  • Template
make_pairs = Node(
    lambda xs: zip(xs[::2], xs[1::2]),
    returns = Args[Param[int, "x"], Param[int, "y"]]
),

sum = Node(
    lambda x, y: x + y
),

mul = Node(
    lambda x, y: x * y
),

pow = Node(
    lambda z: z ** 2
),

connections = [
    Connection("make_pairs", "sum"),
    Connection("make_pairs", "mul"),
    Connection("sum", "pow"),
    Connection("mul", "pow")
]
@Pipework.connect(input = "enter")
def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
    yield from zip(xs[::2], xs[1::2])

@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int):
    return x + y

@Pipework.connect(input = "make_pairs")
def mul(self, x : int, y : int):
    return x * y

@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def pow(self, z : int):
    return x ** 2

Joining inputs#

As pipeworks are operated asynchronously, by default inputs are processed one by one, as they are received. However, a node can be instructed to join the inputs, in which case it waits until all preceeding nodes have sent their outputs. These can then be processed simultaneously:

  • Instance
  • Template
sort = Node(
    lambda zs: list(sorted(zs)),
    join = True
),

connections = [
    Connection("sum", "sort"),
    Connection("mul", "sort")
]
@Pipework.join
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sort(self, zs : list[int]):
    return list(sorted(zs))

When the preceeding nodes have named their outputs, these are passed by name as inputs:

  • Instance
  • Template
make_pairs = Node(
    lambda xs: zip(xs[::2], xs[1::2]),
    returns = Args[Param[int, "x"], Param[int, "y"]]
),

sum = Node(
    lambda x, y: x + y,
    returns = Args[Param[int, "s"]]
),

mul = Node(
    lambda x, y: x * y,
    returns = Args[Param[int, "m"]]
),

sort = Node(
    lambda s = [], m = []: list(sorted(s + m)),
    join = True
),

connections = [
    Connection("make_pairs", "sum"),
    Connection("make_pairs", "mul"),
    Connection("sum"       , "sort"),
    Connection("mul"       , "sort")
]
@Pipework.connect(input = "enter")
def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
    yield from zip(xs[::2], xs[1::2])

@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int) -> Args[Param[int, "s"]]:
    return x + y

@Pipework.connect(input = "make_pairs")
def mul(self, x : int, y : int) -> Args[Param[int, "m"]]:
    return x * y

@Pipework.join
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sort(self, s : list[int], m : list[int]):
    return list(sorted(s + m))

Grouping#

As multiple senders may provide the same named inputs, each parameter is fed a list of inputs, each as long as the number of inputs received. By default these lists are indexed by the order in which the inputs were received. Accordingly, for any sender, all the data sent may be found by accessing the arguments at the same index. However, this behaviour may be different if the node was instructed to join and group by any of the arguments:

  • Instance
  • Template
sum = Node(
    lambda x, y: (x, y, x + y),
    returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]
),

mul = Node(
    lambda x, y: (x, y, x * y),
    returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]
)

sums_and_muls = Node(

    lambda x, y, s, m: zip(s[0], m[0]),

    join         = True, join_groupby = ["x", "y"]
),

connections = [
    Connection("sum", "sums_and_muls"),
    Connection("mul", "sums_and_muls")
]
@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]:
    return x, y, x + y

@Pipework.connect(input = "make_pairs")
def mul(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]:
    return x, y, x * y

@Pipework.join(groupby = ["x", "y"])
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sums_and_muls(self, x : list[int], y : list[int], s : list[list[int]], m : list[list[int]]):
    yield from zip(s[0], m[0])

In the above example, all inputs are mapped according to the x and y values included in their payload. Mind that the inputs for any parameters not selected as aggregator are aggregated as list of lists, whereby the inner lists are indexed by the order in which the inputs were received. If the receipt of multiple such inputs is unlikely, the node may be additionally instructed to flatten any joined data:

  • Instance
  • Template
gen = Node(
    lambda ignore: ((i, i) for i in range(1, 10 + 1)),
    returns = Args[Param[int, "x"], Param[int, "y"]]
),

sum = Node(
    lambda x, y: (x, y, x + y),
    returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]
),

mul = Node(
    lambda x, y: (x, y, x * y),
    returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]
),

sums_and_muls = Node(

    lambda x, y, s, m: zip(s, m),

    join = True, join_groupby = ["x", "y"], join_flatten = True
),

connections = [
    Connection("enter", "gen"),
    Connection("gen", "sum"),
    Connection("gen", "mul"),
    Connection("sum", "sums_and_muls"),
    Connection("mul", "sums_and_muls")
]
@Pipework.connect(input = "enter")
def gen(self, ignore : Any) -> Args[Param[int, "x"], Param[int, "y"]]:
    yield ((i, i) for i in range(1, 10 + 1))

@Pipework.connect(input = "gen")
def sum(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]:
    return x, y, x + y

@Pipework.connect(input = "gen")
def mul(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]:
    return x, y, x * y

@Pipework.join(groupby = ["x", "y"], flatten = True)
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sums_and_muls(self, x : list[int], y : list[int], s : list[int], m : list[int]):
    yield from zip(s, m)

The instruction to flatten may also be given when the node was not instructed to group.

Do note that flattening is only carried out if this is actually possible. If multiple inputs with the same name were passed for the same index or grouping, the list will not be flattened. Instead a warning will given.

Conditional passthrough#

Lastly, it is important to know that connections operate conditionally. By default a connection is only activated when a non-none value is passed. Optionally, this conditionality can be further be made more strict by stating additional activation conditions:

  • Instance
  • Template
even_pow = Node(
    # I only accept even numbers
    lambda x: x ** 2
),

connections = [
    Connection("enter", "even_pow", where = lambda x: x %% 2 == 0)
]
@Pipework.connect(input = "enter", where = lambda x: x %% 2 == 0)
def even_pow(self, x : int):
    # I only accept even numbers
    return x ** 2

Such conditions can even be made greedy, meaning that any connection that is activated may block the sender from sending the input over any subsequent nodes:

  • Instance
  • Template
even_pow = Node(
    # I only accept even numbers
    lambda x: x ** 2
),

uneven_div = Node(
    # I only received uneven numbers
    lambda x: 1 / x
),

connections = [
    Connection("enter", "even_pow", where = lambda x: x %% 2 == 0, greedy = True)
]
@Pipework.connect(input = "enter", where = lambda x: x %% 2 == 0, greedy = True)
def even_pow(self, x : int):
    # I only accept even numbers
    return x ** 2

@Pipework.connect(input = "enter")
def uneven_div(self, x : int):
    # I only receive uneven numbers
    return 1 / x

Pipeworks#

A Pipework can be considered as the sum of nodes and connections, as becomes clear when any of the above examples are examined in full:

  • Instance
  • Template
example_pipework = Pipework(

    passthrough = Node(
        lambda x: print(x) or x
    ),

    connections = [
        Connection("enter", "passthrough"),
        Connection("passthrough", "exit")
    ]
)
class ExamplePipework(Pipework):

    @Pipework.connect(input = "enter")
    @Pipework.connect(output = "exit")
    def passthrough(self, x):
        return print(x) or x

Do note, that the notion of a “sum of nodes and connections” may be taken quite literal, in the sense that pipeworks may also serve as embedable collections of nodes and connections.

  • Instance
  • Template
Pipework(

    gen = Node(
        lambda _: range(1, 10 + 1)
    ),

    inner = ExamplePipework(),

    connections = [
        Connection("enter", "gen"),
        Connection("gen", "inner"),
        Connection("inner", "exit")
    ]
)
class ExamplePipework(Pipework):

    @Pipework.connect(input = "enter")
    def gen(self, _ : Any):
        yield from range(1, 10 + 1)

    @Pipework.connect(input = "gen")
    @Pipework.connect(output = "exit")
    @mixin(example_pipework)
    def inner()
        pass

Under the hood, however, pipeworks are more than a mere collection. They actually implement the logic required to activate the nodes and connections. Pipeworks do so in parallel, by scheduling nodes for execution across threads. That way, pipeworks seek to distribute the workload in such way that as many nodes as possible can be carried out simultaneously. To safeguard thread safety, pipeworks continuously makes deep copies of objects that are pushed around in the machine (unless instructed otherwise). Accordingly, pipeworks are not necessarily the most efficient for every use case. They shine where this overhead is neglible, i.e. when computational or I/O expenses are high.