Pipelines

Node Types

A Conducto pipeline is a tree of Node’s. Parallel nodes have children and run them at the same time. Serial nodes have children and run them one after the other, stopping on errors. Exec nodes run a shell command inside a container.

An Exec node is done when its command finishes successfully. Serial and Parallel nodes have children, and are done when all the children are done.

Save this code to a file and run it. It will prompt you for your Conducto credentials, and then you’ll be able to view it on our app.

import conducto as co

def run():
   # Use co.Exec() to run a build step
   build_node = co.Exec("echo docker build, make, npm i, etc.")

   # Use co.Parallel() to run tests in parallel.
   test_node = co.Parallel()
   for name in ["app", "backend", "metrics"]:
      test_node[name] = co.Exec(f"echo run test for {name}")

   # Use co.Serial() to build then test.
   pipeline = co.Serial()
   pipeline["Build"] = build_node
   pipeline["Test"] = test_node
   return pipeline

if __name__ == "__main__":
   co.main()
# Call `python <file> run` to pretty-print the pipeline:
#     /
#     ├0 Build   echo docker build, make, npm i, etc.
#     └1 Test
#       ├ app   echo run test for app
#       ├ backend   echo run test for backend
#       └ metrics   echo run test for metrics
#
# Call `python <file> run --local` to run it locally.

Constructing the Tree

Pipelines are constructed by adding children to a Parallel or Serial node; children can be any kind of Node. Nest them arbitrarily deeply, as Conducto supports pipelines with millions of Nodes.

Write your pipelines in Python, no YAML or custom languages. From for loops to database queries, use any logic you want when creating your pipeline.

Add children by assigning into a Node like a dict, or use Nodes as context managers with a with statement.

Assign Nodes by direct dict assignment (Node.__setitem__).

import conducto as co
node = co.Parallel()
node["test1"] = co.Exec("echo run test 1")
node["test2"] = co.Exec("echo run test 1")
root = co.Serial()
root["Test"] = node

print(root.pretty())
# /
# └─0 Test
#   ├─ test1   echo run test 1
#   └─ test2   echo run test 1

Equivalently, use /-delimited paths to get or assign deep into a tree.

import conducto as co
root = co.Serial()
root["Test"] = co.Parallel()
root["Test/test1"] = co.Exec("echo run test 1")
root["Test/test2"] = co.Exec("echo run test 2")

Functions can return Nodes as well.

import conducto as co
def make_test_node():
    output = co.Parallel()
    output["test1"] = co.Exec("echo run test 1")
    output["test2"] = co.Exec("echo run test 2")
    return output
root = co.Serial()
root["Test"] = make_test_node()

Use Nodes as context managers for visual clarity, so your indentation matches the structure.

import conducto as co
with co.Serial() as root:
    with co.Parallel(name="Test"):
        co.Exec("echo run test 1", name="test1")
        co.Exec("echo run test 2", name="test2")

Resources and Environment

An Exec node runs a shell command inside a container, but there are additional parameters that specify how it runs. You may specify the image used for creating the container, how much memory it requires, environment variables, and more. For the full list, see conducto.Node.

All Nodes inherit these attributes from their parents. This simplifies specification and also makes it easier to visualize and change manually in the app.

import conducto as co
root = co.Parallel(cpu=2, env={"LOG_LEVEL": "1"})
root["Node1"] = co.Exec("echo I run with 2 cores, and with LOG_LEVEL=$LOG_LEVEL")

Calling Python Methods

The Exec node constructor, conducto.Exec, can be used to call plain Python methods.

import conducto as co

def leibniz(n: int) -> float:
    """
    Leibniz formula for π
      π = 4 - 4/3 + 4/5 - 4/7 + ... + 4/(2*n+1)

    See https://en.wikipedia.org/wiki/Leibniz_formula_for_π
    """
    pi = 0
    sign = 1
    for i in range(n):
        pi += sign * 4 / (2*i+1)
        sign *= -1
    return pi

def compute_pi(n: int) -> co.Serial:
    output = co.Serial(image=co.Image(copy_dir="."))
    output["Leibniz1"] = co.Exec(leibniz, n=n)
    output["Leibniz2"] = co.Exec(leibniz, n=n)
    return output

Exec nodes with a Python callable are translated to a shell command with form conducto <file_containing_func> <func> –<arg1>=<val1> –<arg2>=<val2> …. Calling func inside a container can be tricky: the path on your local file system is not the same as its path inside the container in which it will eventually run. An Exec node with callable requires that conducto.Image with path translation data be specified or inherited from a parent node. It then uses the information contained in the image to compute the correct relative path. Whether the conducto.Image includes files from the local file system or from a remote Git repo, Exec will deduce the correct relative path to use.

The shell command conducto <file> <func> is able to call any method inside <file> that does not start with an underscore. See conducto.main() for more details.

Exec will serialize the arguments according to the types on func’s arguments. It will infer from default values as well as from type hints. It uses a simple serialization system that supports simple types (bool, int, float, str) as well as lists of them. Complex types can be supported, but that feature still takes some work, so if you want this please contact us on Slack to talk about how we can help you through it. If no type is given or hinted, str is assumed.

Lazy Pipeline Creation

Many pipelines are hard or impossible to fully specify up front.

  • A data science pipeline downloads data in one step and processes in the next. The processing step may ignore data that has already been processed, and may parallelize depending on what data is downloaded. Making these decisions at pipeline generation time can be expensive or impossible.

  • A CI/CD pipeline deploys a service and later accesses it for testing. In many cases you can predefine your URLs, but more sophisticated setups can spawn services dynamically and autogenerated them. Writing pipelines to reference these upfront can require confusing levels of indirection, which can often be a source of bugs.

Conducto pipelines can be generated lazily using conducto.Lazy() allowing you to generate your nodes once the necessary data or state is available.

This can be done either with a Python callable or with a shell command similar to conducto.Exec. Here is an example with a Python callable.

import conducto as co

def pipe() -> co.Serial:
    root = co.Serial(image=co.Image(copy_dir="."))
    root["Deploy"] = co.Exec("echo Deploy service")
    root["Test"] = co.Lazy(test_service, num_tests=5)

    print(root.pretty())
    # /
    # ├─0 Deploy   echo Deploy service
    # └─1 Test
    #   ├─0 Generate   conducto test.py test_service --num_tests=5
    #   └─1 Execute

    return root

def test_service(url:str=None, num_tests=1) -> co.Parallel:
    if url is None:
        # Some deployment strategies will create a new service. In these cases you may
        # not know the URL ahead of time but it can be determined on-the-fly. We mock
        # such an example here.
        url = "https://example.com/look_up_url_at_runtime"
    output = co.Parallel()
    for i in range(num_tests):
        output[f"RunTest_{i}"] = co.Exec(f"Testing deployment at {url}")
    return output

if __name__ == "__main__":
    co.main()

Given a func that is type-hinted to return a co.Serial or a co.Parallel, return a co.Serial containing a pair of nodes. The first, Generate, runs func(*args, **kwargs) and prints out the resulting pipeline. The second, Execute, imports that pipeline into the current one and runs it. When the Generate node is Done, a callback is triggered which imports the generated pipeline into the Execute step.

The translation of the Python callable into a shell command for execution in the docker image is the same as described above for Calling Python Methods.

There are cases where a Python callable is inappropriate. The argument serialization may not suffice, or the paths inside the container may not have been detected correctly. Use conducto.Lazy with a command line as a str. The shell command should use the conducto command line tool to serialize the returned Node, but aside from that you may run any command you need.

When used with a shell command Lazy takes a user-specified command with no path inference.

import conducto as co

root = co.Serial()
root["Deploy"] = co.Exec("echo Deploy service")
root["Test"] = co.Lazy("conducto test.py --num-tests=5", node_type=co.Parallel)

print(root.pretty())
# /
# ├─0 Deploy   echo Deploy service
# └─1 Test
#   ├─0 Generate   conducto test.py --num-tests=5
#   └─1 Execute

Launch

conducto.Node.launch_local(self, use_shell=True, retention=7, run=False, sleep_when_done=False, prebuild_images=False)

Launch directly from python.

Parameters
  • use_shell – If True (default) it will connect to the running pipeline using the shell UI. Otherwise just launch the pipeline and then exit.

  • retention – Once the pipeline is put to sleep, its logs and Data will be deleted after retention days of inactivity. Until then it can be woken up and interacted with.

  • run – If True the pipeline will run immediately upon launching. Otherwise (default) it will stay Pending until the user starts it.

  • sleep_when_done – If True the pipeline will sleep – manager exits with recoverable state – when the root node successfully gets to the Done state.

  • prebuild_images – If True build the images before launching the pipeline.

Auto-main Convenience

Command-line helper that allows you from the shell to easily execute methods that return Conducto nodes.

conducto.main(variables=None, default=None, argv=None, env=None, cpu=None, gpu=None, mem=None, requires_docker=False, image: Union[None, str, conducto.image.Image] = None, filename=None, printer=<function pprint>)

Command-line helper that allows you from the shell to easily execute methods that return Conducto nodes.

Parameters
  • default – Specify a method that is the default to run if the user doesn’t specify one on the command line.

  • image – Specify a default docker image for the pipeline. (See also conducto.Image).

  • cpu, mem, requires_docker (env,) – Computational attributes to set on any Node called through co.main.

See Node Methods and Attributes for more details.

import conducto as co

def run() -> co.Serial:
    return co.Serial()

if __name__ == "__main__":
    co.main()

# Call 'python <file> run' to pretty-print the Node returned by `run()`.
# Call 'python <file> run --local` to launch the pipeline on your local host.