Creación de una interfaz de usuario interactiva para los flujos de trabajo de Llamaindex | por Lingzhen Chen | septiembre de 2024

Al iniciar el flujo de trabajo desde la terminal, es sencillo ver qué paso se está ejecutando y el registro que ponemos en esos pasos.

Registro de terminal para la ejecución del flujo de trabajo (captura de pantalla del autor)

También podemos habilitar la interacción humana en el circuito simplemente usando user_feedback = input()en el flujo de trabajo. Esto pausará el flujo de trabajo y esperará la entrada del usuario (consulte el ejemplo de intervención humana en este Llamaindex oficial) computadora portátil). Sin embargo, para poder lograr la misma funcionalidad en una interfaz fácil de usar, necesitamos modificaciones adicionales al flujo de trabajo original.

El flujo de trabajo puede tardar mucho tiempo en ejecutarse, por lo que para una mejor experiencia del usuario, Llamaindex proporcionó una forma de enviar eventos de transmisión para indicar el progreso del flujo de trabajo, como se muestra en el cuaderno. aquíEn mi flujo de trabajo, defino un WorkflowStreamingEvent clase para incluir información útil sobre el mensaje del evento, como el tipo de evento y desde qué paso se envía:

class WorkflowStreamingEvent(BaseModel):
event_type: Literal["server_message", "request_user_input"] = Field(
..., description="Type of the event"
)
event_sender: str = Field(
..., description="Sender (workflow step name) of the event"
)
event_content: Dict[str, Any] = Field(..., description="Content of the event")

Para habilitar el envío de eventos de transmisión, el paso de flujo de trabajo debe tener acceso al contexto compartido, lo que se hace agregando @step(pass_context=True) decorador a la definición del paso. Luego, en la definición del paso, podemos enviar mensajes de evento sobre el progreso a través del contexto. Por ejemplo, en el tavily_query() paso:

@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.data["research_topic"] = ev.user_query
query = f"arxiv papers about the state of the art of {ev.user_query}"
ctx.write_event_to_stream(
Event(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=inspect.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{query}'"},
).model_dump()
)
)

En este ejemplo, establecemos el event_type ser “server_message” . Significa que es un mensaje de actualización y no se requiere ninguna acción del usuario. Tenemos otro tipo de evento "request_user_input" que indica que se necesita una entrada del usuario. Por ejemplo, en el gather_feedback_outline() Paso en el flujo de trabajo, después de generar los esquemas de texto de las diapositivas a partir del resumen del artículo original, se envía un mensaje para solicitarle al usuario que brinde su aprobación y comentarios sobre el texto del esquema:

@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Present user the original paper summary and the outlines generated, gather feedback from user"""
...

# Send a special event indicating that user input is needed
ctx.write_event_to_stream(
Event(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": inspect.currentframe().f_code.co_name,
"event_content": {
"summary": ev.summary,
"outline": ev.outline.dict(),
"message": "Do you approve this outline? If not, please provide feedback.",
},
}
)
)
)

...

Estos eventos se manejan de manera diferente en la API del backend y en la lógica del frontend, lo cual describiré en detalle en las secciones posteriores de este artículo.

Pasos del flujo de trabajo que requieren retroalimentación del usuario (imagen del autor)

Al enviar un "request_user_input" evento al usuario, solo queremos pasar al siguiente paso después Hemos recibido la entrada del usuario. Como se muestra en el diagrama de flujo de trabajo anterior, procede a la outlines_with_layout()paso si el usuario aprueba el esquema, o al summary2outline() volver a pasar si el usuario no lo aprueba.

Esto se consigue utilizando el Future() objeto de Python asyncio Biblioteca. En el SlideGenerationWorkflow clase, establecemos un atributo self.user_input_future = asyncio.Future() que se puede esperar en el gather_feedback_outline() Paso. La ejecución posterior del flujo de trabajo está condicionada al contenido de la retroalimentación del usuario:

@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...

# Wait for user input
if not self.user_input_future.done():
user_response = await self.user_input_future
logger.info(f"gather_feedback_outline: Got user response: {user_response}")

# Process user_response, which should be a JSON string
try:
response_data = json.loads(user_response)
approval = response_data.get("approval", "").lower().strip()
feedback = response_data.get("feedback", "").strip()
except json.JSONDecodeError:
# Handle invalid JSON
logger.error("Invalid user response format")
raise Exception("Invalid user response format")

if approval == ":material/thumb_up:":
return OutlineOkEvent(summary=ev.summary, outline=ev.outline)
else:
return OutlineFeedbackEvent(
summary=ev.summary, outline=ev.outline, feedback=feedback
)

Configuramos el backend usando fastAPI, exponemos un punto final POST para manejar solicitudes e iniciamos la ejecución del flujo de trabajo. La función asincrónica run_workflow_endpoint() acepta ResearchTopic como entrada. En la función, un generador asincrónico event_generator() Se define una tarea que crea una tarea para ejecutar el flujo de trabajo y transmite los eventos al cliente a medida que avanza el flujo de trabajo. Cuando finaliza el flujo de trabajo, también transmite los resultados del archivo final al cliente.


class ResearchTopic(BaseModel):
query: str = Field(..., example="example query")

@app.post("/run-slide-gen")
async def run_workflow_endpoint(topic: ResearchTopic):
workflow_id = str(uuid.uuid4())

wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)

async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}\n\n"

task = asyncio.create_task(wf.run(user_query=topic.query))
logger.debug(f"event_generator: Created task {task}")
try:
async for ev in wf.stream_events():
logger.info(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}\n\n"
await asyncio.sleep(0.1) # Small sleep to ensure proper chunking
final_result = await task

# Construct the download URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"

final_result_with_url = {
"result": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}

yield f"{json.dumps({'final_result': final_result_with_url})}\n\n"
except Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'event': 'error', 'message': error_message})}\n\n"
finally:
# Clean up
workflows.pop(workflow_id, None)

return StreamingResponse(event_generator(), media_type="text/event-stream")

Además de este punto final, existen puntos finales para recibir la entrada del usuario del cliente y gestionar las solicitudes de descarga de archivos. Dado que a cada flujo de trabajo se le asigna un ID de flujo de trabajo único, podemos asignar la entrada del usuario recibida del cliente al flujo de trabajo correcto. Al llamar al set_result() En la espera Futureel flujo de trabajo pendiente puede reanudar su ejecución.

@app.post("/submit_user_input")
async def submit_user_input(data: dict = Body(...)):
workflow_id = data.get("workflow_id")
user_input = data.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the future
logger.info(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.done():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.info("submit_user_input: set_result called")
else:
logger.info("submit_user_input: future already done")
return {"status": "input received"}
else:
raise HTTPException(
status_code=404, detail="Workflow not found or future not initialized"
)

El punto final de descarga también identifica dónde se encuentra el archivo final en función del ID del flujo de trabajo.

@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "final.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"final.pptx",
)
else:
raise HTTPException(status_code=404, detail="File not found")

En la página de interfaz, después de que el usuario envía el tema de investigación a través de st.text_input()se inicia un proceso de larga ejecución en un hilo en segundo plano en un nuevo bucle de eventos para recibir los eventos transmitidos desde el backend, sin interferir con el resto de la página:

def start_long_running_task(url, payload, message_queue, user_input_event):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.close()
except Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))

...

def main():

...

with st.sidebar:
with st.form(key="slide_gen_form"):
query = st.text_input(
"Enter the topic of your research:",
)
submit_button = st.form_submit_button(label="Submit")

if submit_button:
# Reset the workflow_complete flag for a new workflow
st.session_state.workflow_complete = False
# Start the long-running task in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Starting the background thread...")

st.session_state.workflow_thread = threading.Thread(
target=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"query": query},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.start()
st.session_state.received_lines = []
else:
st.write("Background thread is already running.")

Los datos del evento transmitidos desde el backend se obtienen mediante httpx.AsyncClient y se colocan en una cola de mensajes para su posterior procesamiento. Se extrae información diferente según el tipo de evento. Para el tipo de evento “request_user_input”el hilo también se pausa hasta que se proporcione la entrada del usuario.

async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as client:
async with client.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield line

async def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Starting to fetch streaming data..."))
data_json = None
async for data in fetch_streaming_data(url, payload):
if data:
try:
data_json = json.loads(data)
if "workflow_id" in data_json:
# Send workflow_id to main thread
message_queue.put(("workflow_id", data_json["workflow_id"]))
continue
elif "final_result" in data_json:
# Send final_result to main thread
message_queue.put(("final_result", data_json["final_result"]))
continue
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ["request_user_input"]:
# Send the message to the main thread
message_queue.put(("user_input_required", data_json))
# Wait until user input is provided
user_input_event.wait()
user_input_event.clear()
continue
else:
# Send the line to the main thread
message_queue.put(("message", format_workflow_info(data_json)))
except json.JSONDecodeError: # todo: is this necessary?
message_queue.put(("message", data))
if data_json and "final_result" in data_json or "final_result" in str(data):
break # Stop processing after receiving the final result

Almacenamos los mensajes en el st.session_state y usa un st.expander() para mostrar y actualizar estos datos transmitidos.

if st.session_state.received_lines:
with expander_placeholder.container():
# Create or update the expander with the latest truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()

Para garantizar que la interfaz de usuario siga respondiendo y muestre los mensajes de eventos cuando se procesan en un hilo en segundo plano, utilizamos un actualización automática componente para actualizar la página en un intervalo establecido:

if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, limit=None, key="data_refresh")

Cuando el evento transmitido es de tipo “request_user_input”Mostraremos la información relacionada en un contenedor separado y recopilaremos los comentarios de los usuarios. Como puede haber varios eventos que requieran la entrada del usuario de una ejecución de flujo de trabajo, los colocamos en una cola de mensajes y nos aseguramos de asignar una clave única a los st.feedback(), st.text_area() y st.button() que están vinculados a cada evento para garantizar que los widgets no interfieran entre sí:

def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
data = st.session_state.user_input_prompt
event_type = data.get("event_type")
if event_type == "request_user_input":
summary = data.get("event_content").get("summary")
outline = data.get("event_content").get("outline")
prompt_message = data.get("event_content").get(
"message", "Please review the outline."
)

# display the content for user input
st.markdown("## Original Summary:")
st.text_area("Summary", summary, disabled=True, height=400)
st.divider()
st.markdown("## Generated Slide Outline:")
st.json(outline)
st.write(prompt_message)

# Define unique keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"

# Display the approval feedback widget
approval = st.feedback("thumbs", key=approval_key)
st.write(f"Current Approval state is: {approval}")
logging.info(f"Current Approval state is: {approval}")

# Display the feedback text area
feedback = st.text_area(
"Please provide feedback if you have any:", key=feedback_key
)

# Handle the submission of user response
if st.button(
"Submit Feedback", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and feedback using unique keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")

# Ensure approval_state is valid
if approval_state not in [0, 1]:
st.error("Please select an approval option.")
return

user_response = {
"approval": (
":material/thumb_down:"
if approval_state == 0
else ":material/thumb_up:"
),
"feedback": user_feedback,
}
# Send the user's response to the backend

try:
response = requests.post(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.info(
f"Backend response for submitting approval: {response.status_code}"
)
except requests.RequestException as e:
st.error(f"Failed to submit user input: {str(e)}")
return

...

Al final, cuando la ejecución del flujo de trabajo finalmente finaliza, el cliente frontend recibirá una respuesta que contiene la ruta a los archivos finales generados (la misma presentación en formato PDF para la representación en la interfaz de usuario y en formato PPTX para la descarga como resultado final). Mostramos el archivo PDF y creamos un botón para descargar el archivo PPTX:

  if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
try:
# Fetch the PDF content
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.content

st.markdown("### Generated Slide Deck:")
# Display the PDF using an iframe
st.markdown(
f'<iframe src="data:application/pdf;base64,{base64.b64encode(st.session_state.pdf_data).decode()}" width="100%" height="600px" type="application/pdf"></iframe>',
unsafe_allow_html=True,
)
except Exception as e:
st.error(f"Failed to load the PDF file: {str(e)}")

# Provide the download button for PPTX if available
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
try:
# Fetch the PPTX content
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content

st.download_button(
label="Download Generated PPTX",
data=pptx_data,
file_name="generated_slides.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
)
except Exception as e:
st.error(f"Failed to load the PPTX file: {str(e)}")

Crearemos una aplicación Docker multiservicio con docker-compose para ejecutar las aplicaciones frontend y backend.

version: '3.8'

services:
backend:
build:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./data:/app/data
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure

frontend:
build:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network

networks:
app-network:

¡Eso es todo! Solo corre docker-compose upy ahora tenemos una aplicación que puede ejecutar un flujo de trabajo de investigación basado en la consulta ingresada por el usuario, solicitarle comentarios durante la ejecución y mostrarle el resultado final.