Hello and welcome back to the last part of the LangGraph/LangChain/LangSmith course. In this part, we’ll learn how to deal with asynchronous tools by building a graph that will do some web research for us, where one of the tools is going to be visiting several websites at once to feed info back into the graph.
This type of asynchronous action is very helpful when there are multiple steps or actions that can be performed at the same time for optimization as it will save a lot of time and make the user experience much better. It is a bit different to set up and work with though, which is why we’ll be going through it in this part.
I will try to cover the bare basics of async Python programming here, as it can look quite confusing and I want all skill levels to be able to follow along. If you are already very familiar with async programming the level of explanation may be a bit excessive for you and you can probably skip over some of the explanations and just look at the code.
Web research tool
Let’s start by building our tool as usual. This tool is going to visit a bunch of web URLs at the same time (asynchronously) and return the HTML content of each page. We will need to install the BeautifulSoup
library to parse the HTML content of the pages. Run the following command in the terminal:
pipenv install beautifulsoup4==4.12.3
Then go ahead and create a new file called web.py
in the tools
directory:
π FINX_LANGGRAPH π images π output π tools π __init__.py π image.py π pdf.py π weather.py π web.py β¨New file π .env π langchain_basics.py π multi_agent.py π multi_agent_prompts.py π Pipfile π Pipfile.lock π setup_environment.py π simple_langgraph.py
In the web.py
file let’s start with our imports as usual:
import asyncio import json import sys import aiohttp from bs4 import BeautifulSoup from langchain.tools import tool from pydantic import BaseModel, Field
We import asyncio
to work with asynchronous code, aiohttp
to make HTTP requests asynchronously, and BeautifulSoup
to parse the HTML content of the pages. The tool
decorator and pydantic
imports are the same as for the other tools and json
is to return the JSON responses in string format.
Async and event loops
First of all, we’ll use the sys
import to set the type of event loop to use for the asynchronous code:
if sys.platform.startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
Without going into too much detail, there is a known issue with the Python asyncio
library on Windows specifically that happens when the Proactor event loop (the default on Windows) is closed while there are still outstanding tasks. It doesn’t affect the correct execution of the code, but something on Windows + aysncio + LangChain/LangGraph triggers it. We’ll use the selector
event loop policy to avoid this issue (this is only needed/triggers if you’re on Windows.).
While this tutorial part is way too short to really go in-depth on Python’s asynchronous programming, we’ll try to cover the basics as we go along. Basically, we get an event loop, and we can put tasks in there. Normally a task like fetching a webpage would block the code until it’s done, but with asyncio
we can put it in the event loop and continue with other tasks while it’s being fetched. This allows us to run multiple operations at the same time.
This is not to be confused with multi-threading or multi-processing, which are quite different in nature:
- Multi-processing: is about spreading tasks over a computer’s CPU cores, and is well suited for tasks that require lots of mathematical computations.
- Multi-threading: is about running multiple threads in the same process, and is well suited for tasks that are I/O bound (like fetching webpages).
- Asynchronous programming: is a single-process, single-threaded design that uses coroutines to handle multiple tasks concurrently. Async functions are able to sort of pause and resume their execution, allowing other tasks to run in the meantime during this pause.
Async programming in Python is very similar to the JavaScript async/await pattern, and it’s a great way to handle I/O-bound tasks like fetching web pages. If you’re a bit new to this all, just keep going and you’ll get a feel for how it works.
Parsing HTML content
First, we’ll write a very basic function that uses BeautifulSoup to parse some HTML content:
def parse_html(html_content: str) -> str: soup = BeautifulSoup(html_content, "html.parser") for tag in ["nav", "footer", "aside", "script", "style", "img", "header"]: for match in soup.find_all(tag): match.decompose() text_content = soup.get_text() text_content = " ".join(text_content.split()) return text_content[:8_000]
This function takes the HTML content of a webpage as a string and returns a string with the text content of the page. First we instantiate a new BeautifulSoup object passing in the html.parser
string to select the parser. We then make a list of all the HTML tags we want to filter out, namely the navigation, footer, aside, script, style, image and header tags. We’re interested in the main content and don’t want all this pollution.
For each tag in this list of HTML tags, we run soup.find_all(tag)
to find all the tags with that name in the HTML content, which returns all the matches for that tag. This allows us to loop over each match in soup.find_all(tag)
and call match.decompose()
to remove the tag from the HTML content.
We then get the text content of the page with soup.get_text()
to remove as much HTML and unneeded stuff as possible from what was left.
Then we call text_content.split()
to split the text content into a list of words, which has the side effects of removing long sequences of whitespace, tab, and newline characters. We then join the list of words back into a string with " ".join
so that we’re left with only a single space between all words to save space. The LLM does not care about formatting and sending tons of whitespace to it is just a waste of space.
Finally, we return the first 8,000 characters of the text content, to make sure we don’t exceed the context limit if we load like 5 or 6 pages at once. You can set this higher if you use GPT-4-turbo instead of 3.5-turbo
Fetching webpages
Notice that the parse_html
function is just a regular synchronous function. Now let’s get started on the asynchronous part. The first thing we’ll do is write a function to fetch the HTML content of a single webpage, and then we can just call this function multiple times to fetch the content of multiple pages at once.
async def get_webpage_content(url: str) -> str: async with aiohttp.ClientSession() as session: async with session.get(url) as response: html_content = await response.text() text_content = parse_html(html_content) print(f"URL: {url} - fetched successfully.") return text_content
First, we declare our async function using async def
instead of the normal def
. This will allow us to later call await
on this function to make the code non-blocking and run other tasks while we wait for the response. We take a URL string and return a string.
Where we would normally fetch a webpage with the requests
library, here we need to use aiohttp
which is an asynchronous HTTP client/server library for Python that allows us to write this non-blocking code. The ClientSession
object represents a single web session, so you could set headers or cookies here that apply to all requests in this session.
The whole thing can be used as a context manager giving us the async with aiohttp.ClientSession() as session
syntax and any indented code afterward now takes place inside this context. Then we call get(url)
on the session object and use that as a context manager in the same exact manner as the line above it.
The line after calls await
on the response.text()
and will then save this in the html_content
variable. This await
keyword is the magic, and whenever you see this keyword it sort of pauses this code, as time is needed to fetch the webpage. While this is happening, other tasks can run in the event loop.
When the html_content
has finished fetching, we move outside of the two async context managers and call our parse_html
function to get the text content of the page. We then print a message to the console that the URL was fetched successfully and return the text content.
Note that we could easily edit the above function to fetch the whole list of URLs we have inside the same ClientSession
context manager, but as the overhead to calling this function a couple of times is minimal, I’ll just keep it as is for now.
Another fair point to make is that the parse_html
function is technically blocking non-async code, but as it doesn’t take long to run at all, it’s fine to keep it here. The main time-waster is the fetching of the webpages and we made that asynchronous.
Input arguments and the tool
Before we get to the actual tool itself we need to make sure to define our pydantic object with the input arguments for the tool:
class ResearchInput(BaseModel): research_urls: list[str] = Field(description="Must be a list of valid URLs.")
No surprises here, we just want a list of URLs in string format. We’ve used this type of object several times before.
Now let’s write our tool, starting with the first half:
@tool("research", args_schema=ResearchInput) async def research(research_urls: list[str]) -> str: """Get content of provided URLs for research purposes.""" tasks = [asyncio.create_task(get_webpage_content(url)) for url in research_urls]
We use the @tool
decorator to define our tool, passing in the name and the argument schema as always. We declare the function making sure to use async def
, and we declare the same research_urls
argument as we defined in the ResearchInput
class. Again mind the docstring description for the LLM to use.
Then we use a list comprehension, let’s read it from the right to the left. for each url
in the list of research_urls
, we call asyncio.create_task(get_webpage_content(url))
to create a task for each URL. The asyncio.create_task()
function schedules the coroutine to run on the event loop and returns a Task object. However, it doesn’t automatically await the task.
What this means is that it will create our async task and also start it for us, but it won’t await it, or wait for it to finish, which would block the code. We are left with a list full of these task objects of tasks that are currently running but not yet finished.
Let’s finish our tool:
@tool("research", args_schema=ResearchInput) async def research(research_urls: list[str]) -> str: """Get content of provided URLs for research purposes.""" tasks = [asyncio.create_task(get_webpage_content(url)) for url in research_urls] contents = await asyncio.gather(*tasks, return_exceptions=True) return json.dumps(contents)
The asyncio.gather()
function is used to schedule multiple tasks to run and waits for all of them to complete. It will wait for all our tasks from the previous line to fetch their web pages and then gather the results. This is why we await
this function, and then save the results in contents
. *tasks
is a way to unpack the list of tasks into separate arguments passing them into the function.
The return_exceptions
parameter in asyncio.gather()
determines how exceptions are handled. If return_exceptions
is set to False
, gather()
will immediately raise the first exception it encounters. When set to True
, instead of raising exceptions, it will return them in the result list so that contents
will be a list of results or exceptions. We use this as we want to go ahead and fetch the rest of the pages even if one fails.
Finally, dump the response to a JSON string and return it, as naturally, LLMs need string input.
Testing the tool
Now let’s add a quick test to this file to test our tool in isolation and make sure there are no problems:
if __name__ == "__main__": import time TEST_URLS = [ "https://en.wikipedia.org/wiki/SpongeBob_SquarePants", "https://en.wikipedia.org/wiki/Stephen_Hillenburg", "https://en.wikipedia.org/wiki/The_SpongeBob_Movie:_Sponge_Out_of_Water", ] async def main(): result = await research.ainvoke({"research_urls": TEST_URLS}) with open("test.json", "w") as f: json.dump(result, f) start_time = time.time() asyncio.run(main()) end_time = time.time() print(f"Async time: {end_time - start_time} seconds")
We’ve covered the if __name__ == "__main__":
block before, so only if we run this file directly will the code inside this block run. We define a list of test URLs to use and then define an async function called main()
to run our tool with these test URLs. Instead of invoking the tool as we normally do we now use ainvoke
for the async version, and we have to await
the result. This is why the main function is async
as well.
We then open a file called test.json
in write mode and dump the result to it so we can have a quick look to check if the output is as expected. Finally we run the main()
function with asyncio.run(main())
asyncio.run
is a useful function that creates a new event loop, runs the given coroutine which is main
in our case, closes the loop, and then returns the result. This makes it a convenient way to run async code from a synchronous context as it handles the whole event loop thing for us.
I’ve also sneaked a start and end timer in there using time.time()
to see how long it takes to run the async code.
Now go ahead and run the web.py
file and you’ll see something like this:
URL: https://en.wikipedia.org/wiki/The_SpongeBob_Movie:_Sponge_Out_of_Water - fetched successfully. URL: https://en.wikipedia.org/wiki/Stephen_Hillenburg - fetched successfully. URL: https://en.wikipedia.org/wiki/SpongeBob_SquarePants - fetched successfully. Async time: 2.9387967586517334 seconds
I have also tried the synchronous normal version of this code using the requests library, and it took over 7 seconds, so we have a considerable time save here, and this is with only 3 URLs. If I increase the number of URLs to just 6, the async version takes about 4 seconds, while the synchronous version takes like 14.
If you open the test.json
file that has been created you should see something like the following that goes on for quite a while:
"[\"SpongeBob SquarePants - Wikipedia Jump to content From Wikipedia, the free encyclopedia American animated television series This article is about the television series.........
Web research graph setup
We have just written our first async tool! Now let’s put it to good use and write up a quick web research graph. In your root folder create two new files called web_research.py
and web_research_prompts.py
:
π FINX_LANGGRAPH π images π output π tools π __init__.py π image.py π pdf.py π weather.py π web.py π .env π langchain_basics.py π multi_agent.py π multi_agent_prompts.py π Pipfile π Pipfile.lock π setup_environment.py π simple_langgraph.py π web_research.py β¨New file π web_research_prompts.py β¨New file
The graph here will be reasonably simple, having two agents. One of them will use Tavily to do a basic search query, and the other one will use our async tool to do more in-depth research on the URLs provided by the first Tavily agent. You know the drill by now, so we’ll just define our system prompts for the agents before we get started on the main file. If you’re watching the video version of this tutorial make sure you open up the written version so you can more easily copy these. Start by opening up the web_research_prompts.py
file.
We’ll get started with the Tavily agent’s system prompt first:
TAVILY_AGENT_SYSTEM_PROMPT = """ You are a search agent. Your tasks is simple. Use your tool to find results on the internet for the user query, and return the response, making sure to include all the sources with page title and URL at the bottom like this example: 1. [Title 1](https://www.url1.com/whatever): ... 2. [Title 2](https://www.url2.com/whatever): ... 3. [Title 3](https://www.url3.com/whatever): ... 4. [Title 4](https://www.url4.com/whatever): ... 5. [Title 5](https://www.url5.com/whatever): ... Make sure you only return the URLs that are relevant for doing additional research. For instance: User query Spongebob results from calling your tool: 1. [The SpongeBob Official Channel on YouTube](https://www.youtube.com/channel/UCx27Pkk8plpiosF14qXq-VA): ... 2. [Wikipedia - SpongeBob SquarePants](https://en.wikipedia.org/wiki/SpongeBob_SquarePants): ... 3. [Nickelodeon - SpongeBob SquarePants](https://www.nick.com/shows/spongebob-squarepants): ... 4. [Wikipedia - Excavators](https://en.wikipedia.org/wiki/Excavator): ... 5. [IMDB - SpongeBob SquarePants TV Series](https://www.imdb.com/title/tt0206512/): ... Given the results above and an example topic of Spongebob, the Youtube channel is going to be relatively useless for written research, so you should skip it from your list. The Wikipedia article on Excavators is not related to the topic, which is Spongebob for this example, so it should be omitted. The others are relevant so you should include them in your response like this: 1. [Wikipedia - SpongeBob SquarePants](https://en.wikipedia.org/wiki/SpongeBob_SquarePants): ... 2. [Nickelodeon - SpongeBob SquarePants](https://www.nick.com/shows/spongebob-squarepants): ... 3. [IMDB - SpongeBob SquarePants TV Series](https://www.imdb.com/title/tt0206512/): ... """
This is a bit of a long prompt, but it’s quite simple. The Tavily agent is tasked with finding relevant URLs for a given query, and then returning the URLs that are relevant for further research. The prompt gives an example of what the response should look like and also gives an example of what URLs are relevant and what URLs are not.
Now let’s define the system prompt for the web research agent:
RESEARCHER_SYSTEM_PROMPT = """ You are an internet research information-providing agent. You will receive results for a search query. The results will look something like this: 1. [Wikipedia - SpongeBob SquarePants](https://en.wikipedia.org/wiki/SpongeBob_SquarePants): ... 2. [Nickelodeon - SpongeBob SquarePants](https://www.nick.com/shows/spongebob-squarepants): ... 3. [IMDB - SpongeBob SquarePants TV Series](https://www.imdb.com/title/tt0206512/): ... Your job is to use your research tool to find more information on the topic and to write an article about the information you find in markdown format. You will call the research tool with a list of URLs, so for the above example your tool input will be: ["https://en.wikipedia.org/wiki/SpongeBob_SquarePants", "https://www.nick.com/shows/spongebob-squarepants", "https://www.imdb.com/title/tt0206512/"] After you have finished your research you will write a long-form article on all the information you found and return it to the user, making sure not to leave out any relevant details. Make sure you include as much detail as possible and that the article you write is on the topic (for instance Pokemon) instead of being about the websites that you visited (e.g. Wikipedia, YouTube). Use markdown formatting and supply ONLY the resulting article in your response, with no extra chatter except for the fully formed, well-written, and formatted article. Use headers, sub-headers, bolding, bullet lists, and other markdown formatting to make the article easy to read and understand. Your only output will be the fully formed and detailed markdown article. """
The agent is tasked with using the web research tool to find more information on a topic and then writing an article about the information found. The prompt gives an example of what the input to the tool should look like and then specific instructions on using markdown formatting to write the output article and details on the article we want it to write. Save and close the web_research_prompts.py
file.
Web research graph main file
Now let’s move on to the main file web_research.py
and start by importing the necessary modules:
import asyncio import functools import operator import uuid from typing import Annotated, Sequence, TypedDict from langchain.agents import AgentExecutor, create_openai_tools_agent from langchain_community.tools.tavily_search import TavilySearchResults from langchain_core.messages import BaseMessage, HumanMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai import ChatOpenAI from langgraph.graph import END, StateGraph from setup_environment import set_environment_variables from tools.pdf import OUTPUT_DIRECTORY from tools.web import research from web_research_prompts import RESEARCHER_SYSTEM_PROMPT, TAVILY_AGENT_SYSTEM_PROMPT
You’ve seen pretty much all of these imports before in some part of our code so far. We import the prompts we just created and the web research function as well as the OUTPUT_DIRECTORY
we defined in the pdf.py
file so that we can access this folder to save our output. To do this properly it would be best to store these project-wide constants like the paths in a separate file but for now, we’ll just import it from pdf.py
.
Now continue below the imports:
set_environment_variables("Web_Search_Graph") TAVILY_TOOL = TavilySearchResults(max_results=6) LLM = ChatOpenAI(model="gpt-3.5-turbo-0125") TAVILY_AGENT_NAME = "tavily_agent" RESEARCH_AGENT_NAME = "search_evaluator_agent" SAVE_FILE_NODE_NAME = "save_file"
We load up our variables and use the project name Web_Search_Graph
for our LangSmith traces. We create a new instance of the Tavily search tool we imported setting the max_results
to 6, and we create a ChatOpenAI
object as usual. After that we set up some string constants for the names of our agents and nodes again.
We’ll have the create_agent
function which is basically the same as last time:
def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str): prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="agent_scratchpad"), ] ) agent = create_openai_tools_agent(llm, tools, prompt) executor = AgentExecutor(agent=agent, tools=tools) # type: ignore return executor
No real changes there so let’s move on to the AgentState definition:
class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], operator.add]
We have a simple list of BaseMessage
objects and every node in the graph will add a message to this list as the state passes through that particular node.
Creating our nodes
Now we’ll have a basic function to create a new agent node similar to what we’ve done before, but this time we’ll also have one to create an async agent node:
def agent_node(state: AgentState, agent, name): result = agent.invoke(state) return {"messages": [HumanMessage(content=result["output"], name=name)]} async def async_agent_node(state: AgentState, agent, name): result = await agent.ainvoke(state) return {"messages": [HumanMessage(content=result["output"], name=name)]}
The first one is pretty much the same as the one we used before, making sure we return a message in line with what we defined each node will add to the state object. The second one is the async version of the same function. We use async def
and here we await
the agent’s ainvoke
method instead of just calling the normal invoke
method.
Now we can create our Tavily agent and our research agent:
tavily_agent = create_agent(LLM, [TAVILY_TOOL], TAVILY_AGENT_SYSTEM_PROMPT) tavily_agent_node = functools.partial( agent_node, agent=tavily_agent, name=TAVILY_AGENT_NAME ) research_agent = create_agent(LLM, [research], RESEARCHER_SYSTEM_PROMPT) research_agent_node = functools.partial( async_agent_node, agent=research_agent, name=RESEARCH_AGENT_NAME )
You’ve seen all of this before, but make sure you use the async_agent_node
function for the research agent instead of the normal one.
Now we need one more node, that will take the output of the research agent and write it to a file for us. This node does not need any agents or LLM action, so we can just define it as a normal function:
def save_file_node(state: AgentState): markdown_content = str(state["messages"][-1].content) filename = f"{OUTPUT_DIRECTORY}/{uuid.uuid4()}.md" with open(filename, "w", encoding="utf-8") as file: file.write(markdown_content) return { "messages": [ HumanMessage( content=f"Output written successfully to {filename}", name=SAVE_FILE_NODE_NAME, ) ] }
This shows that the graph is really nothing but a state machine. We can just write any arbitrary function and use it as a node as long as we meet the conditions we set for the graph. The function takes the AgentState object as input, does whatever it wants to do, and then adds an update to the AgentState object as promised. It doesn’t matter that there is no agent or LLM in this step.
In this case, we extract the markdown content from the state object’s last message [-1]
which is the research node’s output. We then generate a random filename using the uuid
module and write the markdown content to a file with that name and the .md
extension. Finally, we return a message to the state object that the output was written successfully.
Piecing our graph together
Now we can define our graph:
workflow = StateGraph(AgentState) workflow.add_node(TAVILY_AGENT_NAME, tavily_agent_node) workflow.add_node(RESEARCH_AGENT_NAME, research_agent_node) workflow.add_node(SAVE_FILE_NODE_NAME, save_file_node) workflow.add_edge(TAVILY_AGENT_NAME, RESEARCH_AGENT_NAME) workflow.add_edge(RESEARCH_AGENT_NAME, SAVE_FILE_NODE_NAME) workflow.add_edge(SAVE_FILE_NODE_NAME, END) workflow.set_entry_point(TAVILY_AGENT_NAME) research_graph = workflow.compile()
We just go from the Tavily agent to the research agent, and then from the research agent to the save file node. This example is pretty simple as we’re focusing on the async part. We can always add this to more complex graphs later on if we need to.
Now let’s create a main function to run the graph:
async def run_research_graph(input): async for output in research_graph.astream(input): for node_name, output_value in output.items(): print("---") print(f"Output from node '{node_name}':") print(output_value) print("\n---\n")
This function is an async function that takes an input and then runs the graph with that input. It uses an async for loop to iterate over the output of the graph after we run astream
(async stream) on it. For each output, we get the node’s name and the output value, so we print both to the console to see what is going on live.
Now we can run the graph with a simple test input:
test_input = {"messages": [HumanMessage(content="Jaws")]} asyncio.run(run_research_graph(test_input))
We create the first input message for the state object and then use asyncio.run
as we did before because it takes care of the event loop that runs the async code for us. Save and run this file and you should see the graph running and outputting the results to the console:
API Keys loaded and tracing set with project name: Web_Search_Graph Output from node 'tavily_agent': --- {'messages': [HumanMessage(content='Here are some relevant sources about "Jaws": ... ', name='tavily_agent')]} --- URL: https://www.imdb.com/title/tt0073195/ - fetched successfully. URL: https://www.rottentomatoes.com/m/jaws - fetched successfully. URL: https://www.britannica.com/topic/Jaws-film-by-Spielberg - fetched successfully. URL: https://en.wikipedia.org/wiki/Jaws_(film) - fetched successfully. Output from node 'search_evaluator_agent': --- {'messages': [HumanMessage(content='# **Jaws: A Deep Dive into the Iconic Film**\n\n## markdown summary here... ', name='search_evaluator_agent')]} --- Output from node 'save_file': --- {'messages': [HumanMessage(content='Output written successfully to c:\\Coding_Vault\\FINX_LANGGRAPH_TUTS\\output/d22855f8-9f76-4fc6-8192-7490852e1644.md', name='save_file')]} --- Output from node '__end__': --- {'messages': ['The whole state object...']} ---
Go ahead and open the .md
file that was created in the output
folder and you should see the markdown article that was written by the research agent:
I’ve gone ahead and tried another one inputting the topic “Pokemon”:
There you go! We’ve created a pretty fast and very useful internet research and article-writing tool!
From here on we can create PDF files, send emails, write articles, or do anything and everything we want really. We can tweak the output or the number of input URLs, or use gpt-4-turbo if we want a very long output article and large input context window so we can use even more sources.
We can add any conditional edges and paths and have the agents do whatever we want! All we’ve shown is just the basic ways in which you can combine stuff. You now have all the knowledge you need to build whatever you want. I’ll leave the rest up to your imagination.
It’s been a pleasure to take this journey together. I hope you learned a lot and had some fun along the way. I’ll see you again soon in the next one, until then, happy coding!