Tutorial: Simplifying Pipeline Inputs with Multiplexer


As of version 2.2.0, Multiplexer has been deprecated in Haystack and will be completely removed from Haystack as of v2.4.0. We recommend using BranchJoiner instead. For more details about this deprecation, check out Haystack 2.2.0 release notes on Github.

This tutorial uses Haystack 2.0. To learn more, read the Haystack 2.0 announcement or visit the Haystack 2.0 Documentation.

Overview

If you’ve ever built a Haystack pipeline with more than 3-4 components, you probably noticed that the number of inputs to pass to the run() method of the pipeline grow endlessly. New components take some of their input from the other components of a pipeline, but many of them also require additional input from the user. As a result, the data input of Pipeline.run() grows and becomes very repetitive.

There is one component that can help managing this repetition in a more effective manner, and it’s called Multiplexer.

In this tutorial, you will learn how to drastically simplify the Pipeline.run() of a RAG pipeline using a Multiplexer.

Setup

Prepare the Colab Environment

Install Haystack

Install Haystack 2.0 with pip:

%%bash

pip install haystack-ai "huggingface_hub>=0.23.0"

Enable Telemetry

Knowing you’re using this tutorial helps us decide where to invest our efforts to build a better product but you can always opt out by commenting the following line. See Telemetry for more details.

from haystack.telemetry import tutorial_running

tutorial_running(37)

Enter a Hugging Face API key

Set a Hugging Face API key:

import os
from getpass import getpass

if "HF_API_TOKEN" not in os.environ:
    os.environ["HF_API_TOKEN"] = getpass("Enter Hugging Face token:")

Indexing Documents with a Pipeline

Create a pipeline to store the small example dataset in the InMemoryDocumentStore with their embeddings. You will use HuggingFaceAPIDocumentEmbedder to generate embeddings for your Documents and write them to the document store with the DocumentWriter.

After adding these components to your pipeline, connect them and run the pipeline.

If you’d like to learn about preprocessing files before you index them to your document store, follow the Preprocessing Different File Types tutorial.

from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack.components.embedders import HuggingFaceAPIDocumentEmbedder

documents = [
    Document(content="My name is Jean and I live in Paris."),
    Document(content="My name is Mark and I live in Berlin."),
    Document(content="My name is Giorgio and I live in Rome."),
    Document(content="My name is Giorgio and I live in Milan."),
    Document(content="My name is Giorgio and I lived in many cities, but I settled in Naples eventually."),
]

document_store = InMemoryDocumentStore()

indexing_pipeline = Pipeline()
indexing_pipeline.add_component(
    instance=HuggingFaceAPIDocumentEmbedder(
        api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
    ),
    name="doc_embedder",
)
indexing_pipeline.add_component(instance=DocumentWriter(document_store=document_store), name="doc_writer")

indexing_pipeline.connect("doc_embedder.documents", "doc_writer.documents")

indexing_pipeline.run({"doc_embedder": {"documents": documents}})

Building a RAG Pipeline

Build a basic retrieval augmented generative pipeline with HuggingFaceAPITextEmbedder, InMemoryEmbeddingRetriever, PromptBuilder and HuggingFaceAPIGenerator. Additionally, add AnswerBuilder to help you enrich the generated answer with meta info and the query input.

For a step-by-step guide to create a RAG pipeline with Haystack, follow the Creating Your First QA Pipeline with Retrieval-Augmentation tutorial

from haystack.components.embedders import HuggingFaceAPITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.generators import HuggingFaceAPIGenerator

template = """
 <|user|>
 Answer the question based on the given context.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}
Question: {{ question }}</s>
<|assistant|>
Answer:
"""
pipe = Pipeline()
pipe.add_component(
    "embedder",
    HuggingFaceAPITextEmbedder(
        api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
    ),
)
pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component(
    "llm",
    HuggingFaceAPIGenerator(api_type="serverless_inference_api", api_params={"model": "HuggingFaceH4/zephyr-7b-beta"}),
)
pipe.add_component("answer_builder", AnswerBuilder())

pipe.connect("embedder.embedding", "retriever.query_embedding")
pipe.connect("retriever", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")

Running the Pipeline

Pass the query to embedder, prompt_builder and answer_builder and run it:

query = "Where does Mark live?"
pipe.run({"embedder": {"text": query}, "prompt_builder": {"question": query}, "answer_builder": {"query": query}})

In this basic RAG pipeline, components require a query to operate are embedder, prompt_builder, and answer_builder. However, as you extend the pipeline with additional components like Retrievers and Rankers, the number of components needing a query can increase indefinitely. This leads to repetitive and increasingly complex Pipeline.run() calls. In such cases, using a Multiplexer can help simplify and declutter Pipeline.run().

Introducing a Multiplexer

The Multiplexer is a component that can accept multiple input connections and then distributes the first value it receives to all components connected to its output. In this seeting, you can use this component by connecting it to other pipeline components that expect a query during runtime.

Now, initialize the Multiplexer with the expected input type (in this case, str since the query is a string):

from haystack.components.others import Multiplexer

multiplexer = Multiplexer(str)

Adding the Multiplexer to the Pipeline

Create the same RAG pipeline but this time with the Multiplexer. Add the Multiplexer to the pipeline and connect it to all the components that need the query as an input:

from haystack.components.embedders import HuggingFaceAPITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.generators import HuggingFaceAPIGenerator

template = """
 <|user|>
 Answer the question based on the given context.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}
Question: {{ question }}</s>
<|assistant|>
Answer:
"""
pipe = Pipeline()

pipe.add_component("multiplexer", multiplexer)

pipe.add_component(
    "embedder",
    HuggingFaceAPITextEmbedder(
        api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
    ),
)
pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component(
    "llm",
    HuggingFaceAPIGenerator(api_type="serverless_inference_api", api_params={"model": "HuggingFaceH4/zephyr-7b-beta"}),
)
pipe.add_component("answer_builder", AnswerBuilder())

# Connect the Multiplexer to all the components that need the query
pipe.connect("multiplexer.value", "embedder.text")
pipe.connect("multiplexer.value", "prompt_builder.question")
pipe.connect("multiplexer.value", "answer_builder.query")

pipe.connect("embedder.embedding", "retriever.query_embedding")
pipe.connect("retriever", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")

Running the Pipeline with a Multiplexer

Run the pipeline that you updated with a Multiplexer. This time, instead of passing the query to prompt_builder, retriever and answer_builder seperately, you only need to pass it to the multiplexer. As a result, you will get the same answer.

pipe.run({"multiplexer": {"value": "Where does Mark live?"}})

What’s next

🎉 Congratulations! You’ve simplified your pipeline run with a Multiplexer!

If you liked this tutorial, there’s more to learn about Haystack 2.0:

To stay up to date on the latest Haystack developments, you can sign up for our newsletter or join Haystack discord community.

Thanks for reading!