Native Functions

Overview

Command line interfaces are very common, which is why Conducto nodes run commands. But if your pipeline needs some custom code and you don't want to write a custom CLI for it, there's a short cut. Just put your code in a function:

def myfunc(arg1, arg2):
    # your code here

Then initialize an Exec node with that function and its args.

parent["work miracles"] = co.Exec(myfunc, "my", "args")

Conducto will make sure that your function gets called, and you don't have to provide a command. For this to work, the function needs to be in the same language as the pipeline definition. We call these native functions.

This page will show you how to call them.

Conducto on Both Sides

If a node is going to call a native function, the conducto python package needs to be installed in that node's image. You can install python packages like this:

img = co.Image(image="myimage:latest",
               reqs_py=['conducto'],    # install conducto               copy_dir='.')            # include the pipeline definition

You'll also need to put the pipeline definition (and any files it imports) in the image. In our example it's the same file that defines the pipeline, so we'll use copy_dir=".".

In a typical workflow it's up to you to run the command that creates a pipeline. Then Conducto handles the rest. This asymmetry makes it easy to think that something arcane is happening inside of your containers, but it's actually pretty simple:

Conducto is installed locally and in the container. It works the same way in either location.

To see what I mean, let's look at a pipeline that uses this syntax. (It doesn't need reqs_py because the default image already has Conducto installed.)

import conducto as co

def seive(n: int):    """
    Print all of the prime numbers less than n
    """
    primes = []
    for i in range(2, n):
        if all([i % p for p in primes]):
            primes.append(i)
            print(i)

def primes_less_than(n: int) -> co.Serial:
    with co.Serial(image=co.Image(copy_dir=".")) as root:
        root["find primes"] = co.Exec(seive, n)
    return root

if __name__ == '__main__':
    co.main(default=primes_less_than)

This example may look familiar because it's similar to the example used in Debugging. A key difference is that instead of calling python and passing it sieve.py, the Exec node calls a python function.

If you create a pipeline from this definition you'll see that we haven't actually eliminated the command. Instead the web app shows this:

conducto pipeline_onefile.py seive --n=30

Conceptually, little has changed: The node is still running a shell command. What's different is that Conducto wrote the command based on the indicated function. When it encounters that command in a container it will call that function.

By the way, this pipeline definition can be found in our examples repo. If you're all set up, you'll find that you can use the conducto command to get the same results that a node would, even without first creating a pipeline.

$ conducto pipeline_onefile.py seive --n=30
    2
    3
    5
    ...

So the native function workflow goes like this:

  • When your definition creates an Exec node locally it generates a conducto ... command that indicates your file, function, and args.
  • When that node runs the command, the containerized instance of Conducto finds the file and calls the function with the args.

Parameter Types

You may have noticed that we're using type hints:

def seive(n: int):    primes = []
    for i in range(2, n):
        ...

def primes_less_than(n: int) -> co.Serial:    with co.Serial(image=co.Image(copy_dir=".")) as root:
        root["find primes"] = co.Exec(seive, n)
    return root

This is necessary because when you're running a command, everything is a string. If you don't want your arguments to be strings, Conducto needs to know so that it can call your function with the appropriate type.

If you remove the argument type hint, you'll find that n is no longer an int, which can cause problems downstream.

#def seive(n: int):
def seive(n):    primes = []
    for i in range(2, n): # this now throws a TypeError

If you're seeing type-related errors in a native function, make sure it has the appropriate type hints on its parameters. Otherwise you'll have code that gets a string when it expects something else.

Node Parameters vs Function Parameters

Native function keyword arguments are expressed in the initializer for the Exec node type. Typically, that's where node parameters go. To avoid ambiguity, you'll need a different way to assign node parameters on nodes that call native functions. Node.set() makes this possible:

parent["work miracles"] = co.Exec(myfunc, "my", "args") # native function parameters
parent["work miracles"].set(cpu=4, mem=4)               # node parameters

If you like to use the context manager syntax, you'll probably run into this with the name kwarg.

with co.Serial() as parent:    # this throws a TypeError because 'name' isn't an kwarg on 'myfunc'
    # obj = co.Exec(myfunc, name="one", mykwarg="A")

    obj = co.Exec(myfunc, 1, mykwarg="A")   # native function parameters come first
    obj.set(name="one")                     # node parameters come second

    # or you can avoid the kwarg altogether
    parent["two"] = co.Exec(myfunc, 2, mykwarg="B")

As you can see for the node called two, you can avoid the extra call to set by finding a different way to assign the name.

Serializing Your Objects

As part of building the node's command, Conducto's CLI helper will convert your parameter objects to strings. Often, this works as you'd expect. The code below shows that int behaves nicely.

in_obj = 3
param = str(in_obj)  # 'str' happens by default
out_obj = int(param) # 'int' comes from the type hint
out_obj == 3         # our object is unchanged

But if the details of the string conversion aren't obvious, it's best to do them yourself. You can deserialize the argument from a string in the function definition.

import conducto
import json

def otherfunc(payload):
    payload = json.loads(payload)    # do something

And you can serialize it to a string when the command is created.

payload = json.dumps({ "foo" : "bar" })node["call otherfunc"] = co.Exec(otherfunc, payload)

The example above turns an object into a JSON string before passing it to the Exec node, and reconstructs the object from JSON once it's inside the function.

Conclusion

If you need a node that calls custom code, then you have a choice:

  • Write a custom CLI, add it to an image, and call it like any other tool.
  • Package the custom code with the pipeline definition, and treat it like a native function.

This choice is ultimately a matter of style. If you're uncertain, ask yourself who is going to be viewing this pipeline.

Will they want to look at the definition and get a sense of the big picture without the need chase down custom code in separate projects? Sometimes it's really nice to have everything in one place. If so, then native functions will probably help you paint them a clearer picture in the pipeline definition.

On the other hand, maybe they'd prefer a narrowly compartmentalized view: Just the code than ran please, and nothing more. A viewer like this might ignore the pipeline definition entirely and jump straight into whatever project is called by a failing node. By keeping that project small and separate, you'd be doing them a favor.

If you're still uncertain, it's worth talking about with your team about. A pipeline that gives them the right perspective on the task at hand will be a hard place for bugs to hide.

Concepts

Example Pipelines

API's

External Sites