Basics#
Pypeworks relies on a number of foundational concepts to build up its pipeworks. This chapter explains these concepts, and illustrates how they have been translated into code.
Nodes#
A Node
is a processing unit, taking in data, transforming it, and returning it. In that
sense a node is similar to a typical programming function. In fact, at its core, Pypeworks relies
on functions to wrap a node’s inner logic, regardless of how nodes are declared, either on the fly:
passthrough = Node(
lambda x: x
)
Or, as part of a class:
class ExamplePipework(Pipework):
def passthrough(self, x):
return x
Nodes may take multiple inputs, but require at least one input. Such inputs may be passed anonymously or by name, depending on the input requirements stated by the node:
sum = Node(
lambda x, y: x + y
)
def sum(self, x : int, y : int):
return x + y
By default any input received is assigned to the first parameter. To assign data to any subsequent parameters, the source of the input needs to provide hints on what kind of input is being forwarded. A prior node may do so by type hinting the return values:
Node(
make_pairs = Node(
lambda xs: zip(xs[::2], xs[1::2]),
returns = Args[Param[int, "x"], Param[int, "y"]]
)
)
def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
yield from zip(xs[::2], xs[1::2])
As the above example reveals, that implicitly also means that a node may return multiple values simultaneously. This is similar to how a Python function may present multiple return values. There is one notable difference: a node must always name its return values when passing multiple outputs.
Tip
By default lambda functions do not allow for type hinting. When instantiating a Node, you may pass these type hints as an annotation, providing full support for type checking, even when working with lambda functions:
Node[list[int]](
lambda xs: zip(xs[::2], xs[1::2]),
returns = Args[Param[int, "x"], Param[int, "y"]]
)
Furthermore, the above example also shows that nodes may generate multiple outputs for the same input. Do note, however, that nodes are not generators. The output of a node is consumed eagerly. So, if a node produces an infinite generator, its input will be taken as long as subsequent nodes are able to process them within the limits of the machine’s memory.
Connections#
A Connection
specifies the relation between two nodes. Each connection is defined by an
input and a output. When nodes are defined on the fly, these connections need to be specified
separately:
make_pairs = Node(
lambda xs: zip(xs[::2], xs[1::2]),
returns = Args[Param[int, "x"], Param[int, "y"]]
),
sum = Node(
lambda x, y: x + y
),
connections = [
Connection("make_pairs", "sum")
# or: Connection(input = "make_pairs", output = "sum")
]
This is different when nodes are defined as part of a class. In this case, the declaration of either the input or the output needs to coincide with the declaration of the connection itself. As such, the input or output can be deferred from the specification of the opposite:
def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
yield from zip(xs[::2], xs[1::2])
@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int):
return x + y
In Pypeworks connections are not mutually exclusive. That is to say, a node may send its output to multiple nodes, and it may also receive input from multiple nodes. So, the following code is viable with Pypeworks:
make_pairs = Node(
lambda xs: zip(xs[::2], xs[1::2]),
returns = Args[Param[int, "x"], Param[int, "y"]]
),
sum = Node(
lambda x, y: x + y
),
mul = Node(
lambda x, y: x * y
),
pow = Node(
lambda z: z ** 2
),
connections = [
Connection("make_pairs", "sum"),
Connection("make_pairs", "mul"),
Connection("sum", "pow"),
Connection("mul", "pow")
]
@Pipework.connect(input = "enter")
def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
yield from zip(xs[::2], xs[1::2])
@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int):
return x + y
@Pipework.connect(input = "make_pairs")
def mul(self, x : int, y : int):
return x * y
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def pow(self, z : int):
return x ** 2
Joining inputs#
As pipeworks are operated asynchronously, by default inputs are processed one by one, as they are received. However, a node can be instructed to join the inputs, in which case it waits until all preceeding nodes have sent their outputs. These can then be processed simultaneously:
sort = Node(
lambda zs: list(sorted(zs)),
join = True
),
connections = [
Connection("sum", "sort"),
Connection("mul", "sort")
]
@Pipework.join
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sort(self, zs : list[int]):
return list(sorted(zs))
When the preceeding nodes have named their outputs, these are passed by name as inputs:
make_pairs = Node(
lambda xs: zip(xs[::2], xs[1::2]),
returns = Args[Param[int, "x"], Param[int, "y"]]
),
sum = Node(
lambda x, y: x + y,
returns = Args[Param[int, "s"]]
),
mul = Node(
lambda x, y: x * y,
returns = Args[Param[int, "m"]]
),
sort = Node(
lambda s = [], m = []: list(sorted(s + m)),
join = True
),
connections = [
Connection("make_pairs", "sum"),
Connection("make_pairs", "mul"),
Connection("sum" , "sort"),
Connection("mul" , "sort")
]
@Pipework.connect(input = "enter")
def make_pairs(self, xs : list[int]) -> Args[Param[int, "x"], Param[int, "y"]]:
yield from zip(xs[::2], xs[1::2])
@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int) -> Args[Param[int, "s"]]:
return x + y
@Pipework.connect(input = "make_pairs")
def mul(self, x : int, y : int) -> Args[Param[int, "m"]]:
return x * y
@Pipework.join
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sort(self, s : list[int], m : list[int]):
return list(sorted(s + m))
Grouping#
As multiple senders may provide the same named inputs, each parameter is fed a list of inputs, each as long as the number of inputs received. By default these lists are indexed by the order in which the inputs were received. Accordingly, for any sender, all the data sent may be found by accessing the arguments at the same index. However, this behaviour may be different if the node was instructed to join and group by any of the arguments:
sum = Node(
lambda x, y: (x, y, x + y),
returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]
),
mul = Node(
lambda x, y: (x, y, x * y),
returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]
)
sums_and_muls = Node(
lambda x, y, s, m: zip(s[0], m[0]),
join = True, join_groupby = ["x", "y"]
),
connections = [
Connection("sum", "sums_and_muls"),
Connection("mul", "sums_and_muls")
]
@Pipework.connect(input = "make_pairs")
def sum(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]:
return x, y, x + y
@Pipework.connect(input = "make_pairs")
def mul(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]:
return x, y, x * y
@Pipework.join(groupby = ["x", "y"])
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sums_and_muls(self, x : list[int], y : list[int], s : list[list[int]], m : list[list[int]]):
yield from zip(s[0], m[0])
In the above example, all inputs are mapped according to the x and y values included in their payload. Mind that the inputs for any parameters not selected as aggregator are aggregated as list of lists, whereby the inner lists are indexed by the order in which the inputs were received. If the receipt of multiple such inputs is unlikely, the node may be additionally instructed to flatten any joined data:
gen = Node(
lambda ignore: ((i, i) for i in range(1, 10 + 1)),
returns = Args[Param[int, "x"], Param[int, "y"]]
),
sum = Node(
lambda x, y: (x, y, x + y),
returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]
),
mul = Node(
lambda x, y: (x, y, x * y),
returns = Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]
),
sums_and_muls = Node(
lambda x, y, s, m: zip(s, m),
join = True, join_groupby = ["x", "y"], join_flatten = True
),
connections = [
Connection("enter", "gen"),
Connection("gen", "sum"),
Connection("gen", "mul"),
Connection("sum", "sums_and_muls"),
Connection("mul", "sums_and_muls")
]
@Pipework.connect(input = "enter")
def gen(self, ignore : Any) -> Args[Param[int, "x"], Param[int, "y"]]:
yield ((i, i) for i in range(1, 10 + 1))
@Pipework.connect(input = "gen")
def sum(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "s"]]:
return x, y, x + y
@Pipework.connect(input = "gen")
def mul(self, x : int, y : int) -> Args[Param[int, "x"], Param[int, "y"], Param[int, "m"]]:
return x, y, x * y
@Pipework.join(groupby = ["x", "y"], flatten = True)
@Pipework.connect(input = "sum")
@Pipework.connect(input = "mul")
def sums_and_muls(self, x : list[int], y : list[int], s : list[int], m : list[int]):
yield from zip(s, m)
The instruction to flatten may also be given when the node was not instructed to group.
Do note that flattening is only carried out if this is actually possible. If multiple inputs with the same name were passed for the same index or grouping, the list will not be flattened. Instead a warning will given.
Conditional passthrough#
Lastly, it is important to know that connections operate conditionally. By default a connection is only activated when a non-none value is passed. Optionally, this conditionality can be further be made more strict by stating additional activation conditions:
even_pow = Node(
# I only accept even numbers
lambda x: x ** 2
),
connections = [
Connection("enter", "even_pow", where = lambda x: x %% 2 == 0)
]
@Pipework.connect(input = "enter", where = lambda x: x %% 2 == 0)
def even_pow(self, x : int):
# I only accept even numbers
return x ** 2
Such conditions can even be made greedy, meaning that any connection that is activated may block the sender from sending the input over any subsequent nodes:
even_pow = Node(
# I only accept even numbers
lambda x: x ** 2
),
uneven_div = Node(
# I only received uneven numbers
lambda x: 1 / x
),
connections = [
Connection("enter", "even_pow", where = lambda x: x %% 2 == 0, greedy = True)
]
@Pipework.connect(input = "enter", where = lambda x: x %% 2 == 0, greedy = True)
def even_pow(self, x : int):
# I only accept even numbers
return x ** 2
@Pipework.connect(input = "enter")
def uneven_div(self, x : int):
# I only receive uneven numbers
return 1 / x
Pipeworks#
A Pipework
can be considered as the sum of nodes and connections, as becomes clear when
any of the above examples are examined in full:
example_pipework = Pipework(
passthrough = Node(
lambda x: print(x) or x
),
connections = [
Connection("enter", "passthrough"),
Connection("passthrough", "exit")
]
)
class ExamplePipework(Pipework):
@Pipework.connect(input = "enter")
@Pipework.connect(output = "exit")
def passthrough(self, x):
return print(x) or x
Do note, that the notion of a “sum of nodes and connections” may be taken quite literal, in the sense that pipeworks may also serve as embedable collections of nodes and connections.
Pipework(
gen = Node(
lambda _: range(1, 10 + 1)
),
inner = ExamplePipework(),
connections = [
Connection("enter", "gen"),
Connection("gen", "inner"),
Connection("inner", "exit")
]
)
class ExamplePipework(Pipework):
@Pipework.connect(input = "enter")
def gen(self, _ : Any):
yield from range(1, 10 + 1)
@Pipework.connect(input = "gen")
@Pipework.connect(output = "exit")
@mixin(example_pipework)
def inner()
pass
Under the hood, however, pipeworks are more than a mere collection. They actually implement the logic required to activate the nodes and connections. Pipeworks do so in parallel, by scheduling nodes for execution across threads. That way, pipeworks seek to distribute the workload in such way that as many nodes as possible can be carried out simultaneously. To safeguard thread safety, pipeworks continuously makes deep copies of objects that are pushed around in the machine (unless instructed otherwise). Accordingly, pipeworks are not necessarily the most efficient for every use case. They shine where this overhead is neglible, i.e. when computational or I/O expenses are high.