Node#

class pypeworks.Node(callable: Callable[[Ps], Any], returns: _ArgsAlias = None, join: bool = False, join_groupby: list[str] = None, join_flatten: bool = False, pass_self_as: str = None)#

Represents a processing unit of a Pipework, wrapping a callable that is invoked whenever any data is pushed to this node, as well as various configuration attributes.

Fundamentals#

__init__(callable: Callable[[Ps], Any], returns: _ArgsAlias = None, join: bool = False, join_groupby: list[str] = None, join_flatten: bool = False, pass_self_as: str = None)#

Sets up a node.

Parameters#

callable

A reference to a callable that will be called when any data is forwarded to this node.

Tip

When passing a lambda function, you may type hint this lambda function by passing annotations when instantiating the node:

Node[str, int](
    lambda s, i: s + str(i)
)

Added in version 0.2.0.

returns

Specifies the returns types of the callable using the structures provided by the typing submodule, i.e.:

Pipework(

    gen = Node(
        lambda _: random.randint(1, 100),
        returns = Args[Param[int, "x"]]
    ),

    double = Node(
        lambda x: x * 2
    ),

    connections = [
        Connection("gen", "double")
    ]

)
join

Whether to join all received data together before processing it. For example:

Pipework(

    step0 = Node(
        lambda _: iter(range(1, 3 + 1))
    ),

    step1 = Node(
        lambda inputs: sum(inputs), # Result: 6
        join = True
    ),

    connections = [
        Connection("enter", "step0"),
        Connection("step0", "step1"),
        Connection("step1", "exit")
    ]
)

Without the join decorator step1 would process each input generated by step0 separately, each attempt resulting in an error as the sum function cannot iterate over an integer. With the join decorator, the inputs from step0 are grouped in a list before being passed to step1, where the preceeding inputs are collectively processed, with a sum being run over the list.

  • Joins are only carried out when all data in the pipeline has reached or passed the node set-up to join data. If any data is still being processed by any previous nodes, processing by this node is delayed. This also includes data fed to the enter node, meaning if data is streamed into the pipework, joins are delayed until all data has been put into the pipework. Due to this joining is incompatible with infinite generators.

join_groupby

Parameters by which to group the inputs for the other parameters. By default a join loads each parameter with a list consisting of one entry for each input received. These lists are indexed by the order in which the inputs were processed. Accordingly, for any sending node, all the data sent by that node may be found by accessing the arguments at the same index. By stating a groupby, a different logic is applied. Firstly, the parameters included in the groupby are deduplicated, so that each combination of the given arguments is unique. Secondly, the parameters not included in the groupby are loaded as lists of lists. Their outer lists’ indices corresponds with the indices of the parameters included in the groupby. Their inner lists’ meanwhile represent an aggregation of all the inputs sharing the same values for the parameters included in the groupby. Consider the following example:

Pipework(

    split = Node(
        lambda s: iter(s.split(";"))
    ),

    length = Node(
        lambda chunk: (len(chunk), chunk),
        returns = Args[Param[int, "len"], Param[str, "chunk"]]
    ),

    combine = Node(
        lambda len, chunk: {"len": len, "chunk": chunk},
        join = True,
        join_groupby = ["len"]
    ),

    connections = [
        Connection("enter", "split"),
        Connection("split", "length"),
        Connection("length", "combine"),
        Connection("combine", "exit")
    ]

)
join_flatten

Flag that indicates whether any grouped input should be flattened or not. When input is flattened, a singular value is chosen for each non-grouping parameter. Consider the following example:

Pipework(

    gen = Node(
        lambda _: (random.randint(1, 100) for _ in range(0, 100))
    ),

    pow = Node(
        lambda x: (x, x ** 2),
        returns = Args[Param[int, "x"], Param[int, "y"]]
    ),

    final = Node(

        lambda x, y: {"x": x, "y": y},

        join = True,
        join_groupby = ["x"],
        join_flatten = True
    ),

    connections = [
        Connection("enter", "gen"),
        Connection("gen", "pow"),
        Connection("pow", "final"),
        Connection("final", "exit")
    ]
)
pass_self_as

Optionally the Node may pass a self-reference to the callable. Using pass_self_as, a name may be given for the argument that holds this self-reference. By default, if no name is specified, the Node does not pass a self-reference to the callable.

Methods#

root()#

Retrieves the ‘root’ of the node - the parent of parents at the highest level of the pipework.

Properties#

parent: Pipework#

Pipework in which the node is embedded.