Welcome back to part 5, where we’ll take a look at OpenAI’s new text-to-speech API.
Text-to-speech is nothing new, the robot voices have been famous for a long, long time. What is new is the quality is so good it’s almost scary now! So let’s jump right in.
Create a new folder called '5_Text_to_speech'
and inside, create a new file named 'text_to_speech.py'
like this:
📁FINX_OPENAI_UPDATES (root project folder) 📁1_Parallel_function_calling 📁2_JSON_mode_and_seeds 📁3_GPT4_turbo 📁4_DALLE 📁5_Text_to_speech 📄text_to_speech.py
Inside, we’ll get started with our imports:
from decouple import config from typing import Literal, get_args from pathlib import Path from openai import OpenAI
All of these should be thoroughly familiar by now except for the typing imports which we’ll cover as we use them.
Note that the typing imports are not required to use the API specifically, I just happened to use them for this example code to clarify what range of options are available for certain arguments, you may leave them out if you dislike type hints in Python, or try them out if you’re not yet familiar with them.
We continue our code as follows:
client = OpenAI(api_key=config("OPENAI_API_KEY")) current_directory = Path(__file__).parent Model = Literal["tts-1", "tts-1-hd"] Voice = Literal["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
We set up our client
and current_directory
path as is well familiar by now.
We also set up two type aliases, Model
and Voice
, which are both Literal
types.
Literal types are a special type hint that allows us to specify a range of options for a given variable.
In this case, we’re specifying that the Model
type can only be one of two options, "tts-1"
or "tts-1-hd"
. The Voice
type can only be one of six options, "alloy"
, "echo"
, "fable"
, "onyx"
, "nova"
, or "shimmer"
.
If you’re not that familiar with types, note that 'Model'
is a type alias, it’s not a variable on its own. All it says is that all variables that are of the type "Model"
should have a value of either "tts-1"
or "tts-1-hd"
.
model_name: Model = "tts-1" #valid model_name: Model = "robo-voice" #invalid as it's not an option on the Model type alias
This is a great way to make sure we don’t accidentally pass in an invalid argument, and also to make it clear to anyone reading our code what the valid options are.
Note that you won’t actually get an error when passing in an invalid type unless you have a type checker enabled in your IDE, but even without using a type checker the type hints will still help clarify your code.
Calling the TTS API
Ok so now let’s have a quick reusable function that both uses and clarifies how to use the OpenAI text-to-speech API:
def text_to_speech( input_text: str, model: Model, voice: Voice, file_name: str, ): response = client.audio.speech.create( model=model, voice=voice, input=input_text, ) response.stream_to_file(f"{current_directory}/{file_name}.mp3")
We take an input text as a string, a model which has to be of type Model
, so one of the available options in the type alias, a voice which has to be of type Voice
, and a file_name
which is just a string.
We then call the OpenAI API client’s audio.speech.create
endpoint with the given arguments.
Note the handly stream_to_file
method provided on the response object which allows us to save the response to a file by providing a path
as argument. We combine the current_directory
defined on top of our file and the file_name
argument to create the file path.
Testing the Different Voices
So let’s have a listen to the various voices:
for voice in get_args(Voice): text_to_speech( input_text="My name is Spongebob and I love pineapples!", model="tts-1", voice=voice, file_name=voice, )
The get_args
function we imported from typing gets all the possible options from the Voice
type alias, which is the six voices we defined earlier.
We then loop over them and call our text_to_speech
function with the given arguments. If you run this an mp3 file will be created for each voice and saved in the current directory. (alloy.mp3
, echo.mp3
, etc…)
So go ahead and run this and then have a listen to the 6 different voice files.
The voices really are different and have different inflections and tones to them. Of course, nothing beats a real voice actor but they sound very natural and human-like for computer voices.
tts-1 vs tts-1-hd
Now let’s have a listen to the different models to see if the HD option makes a big difference.
We’ll generate one file for each model using the echo voice:
for model in get_args(Model): text_to_speech( input_text="My name is Spongebob and I love pineapples!", model=model, voice="echo", file_name=model, )
Run this and another two mp3 files will be created, "tts-1.mp3"
and "tts-1-hd.mp3"
.
I don’t hear a big change in quality, though this is a very small sample. But interestingly enough there is a change in the voice inflections and emphasis on the word Love in the HD version, so that’s interesting. It doesn’t seem to be limited to a mere sound quality change.
Building a Talking ChatGPT
That’s enough introduction, let’s build something cool with it!
Create a new file called 'talking_gpt.py'
and let’s create a ChatGPT version that will speak the answer to us:
📁FINX_OPENAI_UPDATES (root project folder) 📁1_Parallel_function_calling 📁2_JSON_mode_and_seeds 📁3_GPT4_turbo 📁4_DALLE 📁5_Text_to_speech 📄text_to_speech.py 📄talking_gpt.py
The example we’ll be looking at here is to create a talkingGPT
that will take our question and then speak the answer to us. We will explore downloading and then playing the response versus streaming it directly to our speakers before the entire response is generated by streaming.
A small disclaimer before we start this part, handling audio files can be a bit tricky when doing this for the first time and you may find you run into specific errors with missing dependencies or codecs.
I’ve tried to use as simple audio libraries and a solution as possible to keep your issues to a minimum, like avoiding the somewhat buggy pydub
library.
But if you do run into issues, I suggest you google the error message and find a solution that way. Having to troubleshoot on your own and fix potential bugs is an almost daily part of software development and I unfortunately cannot help you solve system-specific issues.
That being said you hopefully shouldn’t have any with the libraries chosen and used here.
First, open a shell to install a couple of dependencies we need. Run the following command in your terminal:
pip install PyAudio soundfile playsound
Now open your 'talking_gpt.py'
file and let’s get started with our imports, Python built-in imports first:
import io from distutils.util import strtobool from pathlib import Path from typing import Literal from time import sleep, time
The io
module contains Python’s core tools for working with streams, the time module will be used to time our responses. The strtobool
function is a simple function that converts a string like 'yes'
or 'no'
to a boolean value, and the remaining two are already familiar from previous parts.
Now our other library imports:
import pyaudio import requests import soundfile as sf from decouple import config from openai import OpenAI from playsound import playsound import threading
We will use pyaudio
to work with audio in our streaming function, requests to make a manual HTTP request to the OpenAI API, and soundfile
to play audio from our buffer.
Playsound will be used as the simplest library to play an mp3 file already fully saved to disk for our download-first non-streaming example.
We also import threading as we will need it for the streaming example, gathering the data on a separate thread while we play the audio on the main thread simultaneously.
Now start with some setup:
API_KEY = config("OPENAI_API_KEY") client = OpenAI(api_key=API_KEY) current_directory = Path(__file__).parent buffer_file_location = current_directory / "buffer.opus" Model = Literal["tts-1", "tts-1-hd"] Voice = Literal["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
We will use our API_KEY
several times so load it in a variable.
We set up our client
and current_directory
path as is well familiar by now and the two type aliases from the previous part make a reappearance to define the possible options.
The buffer_file_location
is a Path
object that points to a file called "buffer.opus"
in our current directory. We will use this to store our streamed audio while playing it. (More on this later)
Download and Then Play
Now let’s code up the first example, which simply downloads the whole text-to-speech audio response and then plays it after it has been fully downloaded:
def download_and_play(input: str, model: Model = "tts-1", voice: Voice = "onyx"): start_time = time() response = client.audio.speech.create( model=model, voice=voice, input=input, ) # Note that at this point the whole audio file is downloaded and stored in memory in it's entirety. The 'stream_to_file' method is a bit of a misnomer as it doesn't actually stream the audio here but just saves it to a file. response.stream_to_file(f"{current_directory}/audio.mp3") time_until_playback = time() - start_time print(f"Time until playback: {time_until_playback} seconds") playsound(f"{current_directory}/audio.mp3")
We take an input string, a model, and a voice as arguments, setting defaults for the model and voice.
We then call time()
to get a reference of the exact current time to measure the time it will take for our audio to start playing from the initial request.
We get a response by calling client.audio.speech.create
and passing in the given arguments.
We then call the stream_to_file
method on the response object to save the response to a file. Note that the entire response has already been received at this point, so this is not really streaming, as you will see when we compare times later on.
Finally, we calculate the time it took for the audio to start playing by getting the current time and subtracting the starting time, then printing it out.
We then call playsound
to play the file we just saved to disk.
Try it out by adding a print statement and running the file:
download_and_play("This is a test.")
You should hear the onyx voice say "This is a test."
and see the time it took to play the audio in seconds:
Time until playback: 1.0087647438049316 seconds
Now change the above download_and_play
call like this:
download_and_play( "This is a test. When the test gets much longer the difference will be much more obvious as to the speed in generation times taken for the test." )
Now run the file again (making sure you removed the earlier call to the method) and you will see the issue:
Time until playback: 2.1658453941345215 seconds
The time it takes to generate the audio goes up linearly with the length, and for a long answer, this will easily go up to 15 seconds or longer. This is a big problem if we want anyone to actually have the patience to use our amazing talking GPT.
Streaming the Audio During Generation
Make sure you comment out the "download_and_play("…")"
line before continuing as we don’t want to keep running this test.
For the second example, we will be streaming the response and playing it as it is still being generated. This will greatly speed up the loading time and cut down on the waiting between the question asked and the audio playing.
Note that the OpenAI library doesn’t yet handle audio streaming properly, so we cannot use the OpenAI library or the client.audio.speech.create
method that we would normally call.
We’ll send a manual request to the API endpoint for streaming purposes, manually handling the request-response cycle.
I’ve not yet seen a proper implementation of this on the internet, nor are there any examples in the documentation and some of the solutions I have seen have the user download the entire file before playing it, yet they are mistaken into thinking they are streaming it, still taking a long time.
I will show you how to actually really stream the audio before downloading the full file, though it will get a little bit hacky.
Let’s go over this function in parts as it is a bit longer, and then after the full explanation we’ll give the entire function in a single block again so that you can properly copy it as the explanation blocks will be a bit long in our first look:
def stream_audio(input: str, model: Model = "tts-1", voice: Voice = "onyx"): start_time = time() py_audio = pyaudio.PyAudio()
Again, we take an input string and a model and voice from the possibilities in our Model
and Voice
type aliases, providing a default value for both. We then call time()
to get a reference of the current exact time so we can calculate the time taken later on.
We then create an instance of the PyAudio
class from the pyaudio
library and catch it under the variable name 'py_audio'
. With pyaudio
, you can easily use Python to play and record audio on a variety of platforms.
The py_audio
variable now holds a PyAudio
object, which can be used to control audio streams.
For example, you could open a new stream and start playing audio, or even record audio from a microphone. We’ll see how the playing works in a moment.
Continue the function:
url = "https://api.openai.com/v1/audio/speech" headers = { "Authorization": f"Bearer {API_KEY}", } response = requests.post( url, headers=headers, json={ "model": model, "input": input, "voice": voice, "response_format": "opus", }, stream=True, )
We define the URL endpoint, as we’ll be calling it manually without using the client object this time.
We then define the authorization header which will hold our API key to authorize our request to the OpenAI API. Normally the OpenAI library handles sending this header behind the scenes.
Now we make a request using the requests library, doing a post request to the speech endpoint we defined in 'url'
, sending along our headers, and adding a json
object.
The json
object contains the arguments we would normally pass to the client.audio.speech.create
method, but we also add a “response_format
” argument which we set to “opus
“.
OPUS is an audio format that was developed for streaming audio over the internet. This alone will cut down the time taken to receive the audio.
Note that finally, we pass stream=True
to the request
, which is something that is not yet possible with the OpenAI library, though it might be by now if you’re watching this in the future!
This is one of the reasons we’re making a manual API request.
if response.status_code == 200: chunk_size = 4096 buffer = io.BytesIO()
If the response
status code is 200, which means the request was successful, we continue.
We define a chunk_size
of 4096 bytes.
We then create a BytesIO
object, which is a stream implementation using an in-memory bytes buffer, or basically Python will create a small buffer in memory where we can store our stream.
A quick side note before we move on. We will be writing our audio both to this stream and to a physical file on disk.
This is technically double but it is done to keep this tutorial as simple as possible. If you make a production-grade implementation you will want to get rid of this duplication but for our purposes, it’s not a big deal, and makes it a lot easier to visually show what is going on.
With that out of the way, let’s define an inner function (still inside the stream_audio
function) that handles collecting our data:
def stream_audio(...): ... if response.status_code == 200: ... ... def collect_data(): with open(buffer_file_location, "wb") as f: chunks_written = 0 for chunk in response.iter_content(chunk_size=chunk_size): buffer.write(chunk) f.write(chunk) chunks_written += 1 if chunks_written % 3 == 0: print(f"Buffer size: {buffer.tell()} bytes")
We define a collect_data
function that takes no arguments.
We then open a file in write binary mode, catching it under the variable name 'f'
.
We define a chunks_written
variable and set it to 0 and then loop over the response object’s iter_content
method, which will return the response data in chunks of the given chunk_size
.
This means we can start looping over the response data before the entire response has been received.
We then write the chunk to our buffer and to the file, incrementing the chunks_written
variable by 1. We then check if chunks_written
is divisible by 3, just to limit the number of print messages, and if so we print out the current size of our buffer using buffer.tell()
. This is just to show you how the buffer is growing as we’re writing to it.
Now that we have a function to collect the data as it comes in and write it to our buffer and file, we need another function to play the audio of unknown length as it is still in the process of coming in.
We’ll call this function play_audio
:
def stream_audio(...): ... if response.status_code == 200: ... def collect_data(): ... def play_audio(): with sf.SoundFile(buffer_file_location, "r") as audio_file: stream = py_audio.open( format=pyaudio.paInt16, channels=audio_file.channels, rate=audio_file.samplerate, output=True, ) dtype = "int16" data = audio_file.read(chunk_size, dtype=dtype) while len(data) > 0: stream.write(data.tobytes()) data = audio_file.read(chunk_size, dtype=dtype) stream.stop_stream() stream.close()
We open the buffer file (buffer.opus
) that we will also be writing to simultaneously using the collect_data
function.
We use the ‘with
‘ context manager to open the file using the sf.SoundFile
class in read mode, catching it under the variable name 'audio_file'
.
Now that we have the file open, we can create a stream using the py_audio
object we created earlier.
We call py_audio.open
and pass in the format, which we set to pyaudio.paInt16. paInt16
refers to a signed 16-bit binary string, which is the format this audio is in as sound is stored in binary.
We pass in the channels and the sample rate, both of which we get from the audio_file
object calling audio_file.channels
and audio_file.samplerate
respectively.
We then set output to True
as we want to output the audio to our speakers.
After declaring a simple string variable to hold the int16
datatype, we read one chunk of chunk_size
worth of data from the audio buffer file we opened.
We then enter a while loop that will run as long as the length of the data is greater than 0, writing the data to the stream using stream.write
and passing in the data calling .tobytes
on it.
We then read the next chunk and as long as there is more data to read the while loop will keep running. Because we set output to True
when creating the stream, the audio will start playing as soon as we start writing to the stream.
Once there is no more data to read we exit the while loop and stop and close the stream.
Threading
Ok so now that we have functions both to do the data collection and the audio playback, the challenge is that we need to have them running at the same time, as we want to start playing the audio while we are also downloading it simultaneously.
This is where threading comes in.
Below and outside of the play_audio
function, continue:
def stream_audio(...): ... if response.status_code == 200: ... def collect_data(): ... def play_audio(): ... collect_data_thread = threading.Thread(target=collect_data, daemon=True) collect_data_thread.start()
We initialize a new thread by calling threading.Thread
and passing in the target
function for the thread to run (which is why collect_data
is a separate function).
We set daemon
to true. Without going into too much detail about threading this means that if the main thread (the thread that is running the rest of our code) exits, the daemon thread will be automatically killed as well.
We then call .start()
to get the thread running, meaning our data collection will start in the background, kind of like using async
or promises in JavaScript.
Continue on:
while buffer.tell() < chunk_size: sleep(0.2) time_until_playback = time() - start_time print(f"Time until playback: {time_until_playback} seconds") play_audio()
We use buffer.tell()
to see the size of the buffer, and we want to make sure it’s at least the size of our chunk_size
before we start playing the audio, as trying to play an empty or nonexistent audio file will crash us out.
We use a while loop to check this, and if it’s not yet the size of our chunk_size
we sleep for 0.2 seconds and then check again. (It’s a bit crude but let’s keep the code simple here).
Once the buffer is at least the size of our chunk_size
we calculate the time it took for the audio to start playing by getting the current time and subtracting the starting time, then printing it out.
We then call our play_audio
function to start playing the partial audio file in the main thread while the collect_data
function is still writing to it in the other thread.
Now going back out to the if statement we were inside, let’s define the else
portion:
def stream_audio(...): ... if response.status_code == 200: ... def collect_data(): ... def play_audio(): ... .... else: print(f"Error: {response.status_code} - {response.text}") py_audio.terminate()
If there is an error we print out the status code and the response text.
We then call py_audio.terminate()
to terminate the py_audio
object we created earlier as it is no longer needed. We made it!
Here is the function once more in its entirety without the explanation, make sure you copied it correctly:
def stream_audio(input: str, model: Model = "tts-1", voice: Voice = "onyx"): start_time = time() py_audio = pyaudio.PyAudio() url = "https://api.openai.com/v1/audio/speech" headers = { "Authorization": f"Bearer {API_KEY}", } response = requests.post( url, headers=headers, json={ "model": model, "input": input, "voice": voice, "response_format": "opus", }, stream=True, ) if response.status_code == 200: chunk_size = 4096 buffer = io.BytesIO() def collect_data(): with open(buffer_file_location, "wb") as f: chunks_written = 0 for chunk in response.iter_content(chunk_size=chunk_size): buffer.write(chunk) f.write(chunk) chunks_written += 1 if chunks_written % 3 == 0: print(f"Buffer size: {buffer.tell()} bytes") def play_audio(): with sf.SoundFile(buffer_file_location, "r") as audio_file: stream = py_audio.open( format=pyaudio.paInt16, channels=audio_file.channels, rate=audio_file.samplerate, output=True, ) dtype = "int16" data = audio_file.read(chunk_size, dtype=dtype) while len(data) > 0: stream.write(data.tobytes()) data = audio_file.read(chunk_size, dtype=dtype) stream.stop_stream() stream.close() collect_data_thread = threading.Thread(target=collect_data, daemon=True) collect_data_thread.start() while buffer.tell() < chunk_size: sleep(0.2) time_until_playback = time() - start_time print(f"Time until playback: {time_until_playback} seconds") play_audio() else: print(f"Error: {response.status_code} - {response.text}") py_audio.terminate()
Testing
Ok so now let’s go and test it out. Make sure you don’t have any other calls to the download_and_play
or stream_audio
functions in your file and then add the following, getting a benchmark time on download_and_play
first:
download_and_play( "This is a test. When the test gets much longer the difference will be much more obvious as to the speed in generation times taken for the test. The download and play will keep taking longer and longer where the streaming option will stay constant. Keep in mind a general ChatGPT answer will be longer than this test text so the difference will be considerably bigger." )
Run the file and you should see the time it took to play the audio:
Time until playback: 4.239435434341431 seconds
Three seconds already, keep in mind that most ChatGPT answers will be considerably longer than this, keeping the user waiting for 10 to 15 seconds before they ever hear anything.
Replace/remove the above call with an identical call but this time to stream audio:
stream_audio( "This is a test. When the test gets much longer the difference will be much more obvious as to the speed in generation times taken for the test. The download and play will keep taking longer and longer where the streaming option will stay constant. Keep in mind a general ChatGPT answer will be longer than this test text so the difference will be considerably bigger." )
Now run the file again and you’ll see the following output:
Buffer size: 12288 bytes Time until playback: 1.5114212036132812 seconds Buffer size: 20480 bytes Buffer size: 28672 bytes Buffer size: 40960 bytes Buffer size: 53248 bytes Buffer size: 65536 bytes Buffer size: 77824 bytes Buffer size: 90112 bytes Buffer size: 102400 bytes Buffer size: 112261 bytes
We can see that the buffer starts to build, as we made sure to print it once every three or so chunks we save.
After about 1.5 seconds we start playing the audio already, even though you can clearly see the buffer is still building in the background. This is the power of streaming! No matter how long the text the time until playback starts will always stay constant.
So make sure you remove the stream_audio()
test call we added above and all other test calls you may have created and let’s move on to the final part of this tutorial, which is to create a talking GPT that will take our question and then speak the answer to us.
Combining TTS and GPT
This will be relatively easy after what we’ve just been through!
def talking_gpt(query, streaming: bool = False): response = client.chat.completions.create( model="gpt-3.5-turbo-1106", messages=[{"role": "user", "content": query}], temperature=0.7, ) content = response.choices[0].message.content print(content) try: print("Loading audio... (Ctrl+C to stop loading/playing)") if streaming: stream_audio(content) else: download_and_play(content) except KeyboardInterrupt: print("Stopping playback...") exit(0)
We create a function called talking_gpt
that takes a query and a streaming
argument, which is a boolean that defaults to False
.
We then call the client.chat.completions.create
method to get a response from the newest GPT-3.5 Turbo model, passing in the query as a message from the user.
We simply get the content from the response ChatGPT sent us and print it out.
We then open a try/except block, as in the except part we want to catch a KeyboardInterrupt
exception, which is what happens when we press Ctrl+C
in the terminal.
This way if the user presses Ctrl+C
we can stop the loading and exit the program.
Inside the try
part of the block, we check if the 'streaming'
variable is set to True
or not, calling either the stream_audio
or download_and_play
function accordingly. We pass in the content we got back from ChatGPT as the input string.
Command Line Program
Now let’s add a main block to our file, below and outside the talking_gpt
function, so we can turn this into a quick command line program:
if __name__ == "__main__": while True: stream_or_not: str = input("Stream audio? (y/n): ") try: stream_or_not: bool = strtobool(stream_or_not) break except ValueError: print("Please enter y or n") question = input("Enter your question: ") talking_gpt(question, streaming=stream_or_not)
We start with an if statement that checks if the __name__
variable is equal to "__main__"
. This is a special variable that is set to "__main__"
when the file is run directly, but not when it is imported as a module inside other code.
While True
is an infinite loop, as True
always evaluates to True
.
We then ask the user if they want to stream the audio or not, catching the string input under the variable name ‘stream_or_not
‘. We then try to convert the string input to a boolean using the strtobool
function we imported earlier.
If the value is a valid value for strtobool
to convert like 'y'
, 'yes'
, 'n'
, or 'no'
, we break out of the infinite loop using 'break'
.
If the value is not valid, we catch the ValueError
exception and print out a message to the user to enter a valid value. As we’re inside the infinite loop, the user will be asked again to enter a valid value.
Once the user has entered a valid yes or no type answer to the streaming question, we ask the user for their question to ChatGPT, catching it under the variable name ‘question
‘. We then call the talking_gpt
function, passing in the question and the streaming variable we got from the user.
Go ahead and run this file:
Stream audio? (y/n):
Make your choice and press enter, then add your query and press enter:
Stream audio? (y/n): y Enter your question: Who is Spongebob?
Depending on whether or not you choose streaming, your response will be very fast or very slow, so you can really see the difference.
Stream audio? (y/n): y Enter your question: Who is Spongebob? SpongeBob SquarePants is a fictional character and the main protagonist of the animated television series of the same name. He is a yellow sea sponge who lives in a pineapple under the sea in the fictional underwater city of Bikini Bottom. SpongeBob is known for his cheerful personality, love of his job as a fry cook at the Krusty Krab, and his friendship with his best friend Patrick Star. Loading audio... (Ctrl+C to stop loading/playing) Buffer size: 12288 bytes Time until playback: 1.6946372985839844 seconds Buffer size: 20480 bytes Buffer size: 32768 bytes Buffer size: 45056 bytes Buffer size: 57344 bytes Buffer size: 69632 bytes Buffer size: 81920 bytes Buffer size: 94208 bytes Buffer size: 106490 bytes Buffer size: 114688 bytes Buffer size: 124242 bytes
So that’s it for this part of the tutorial series!
I’ll see you soon in the next part where we’ll be looking at OpenAI Assistants, which is very similar to LangChain Agents, familiar if you followed my LangChain course here at the Finxter Academy.
👉 Very cool stuff and I’ll see you there soon!