Skip to main content

Router - Load Balancing, Fallbacks

LiteLLM manages:

  • Load-balance across multiple deployments (e.g. Azure/OpenAI)
  • Prioritizing important requests to ensure they don't fail (i.e. Queueing)
  • Basic reliability logic - cooldowns, fallbacks, timeouts and retries (fixed + exponential backoff) across multiple deployments/providers.

In production, litellm supports using Redis as a way to track cooldown server and usage (managing tpm/rpm limits).

info

If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server

Load Balancing​

(s/o @paulpierre and sweep proxy for their contributions to this implementation) See Code

Quick Start​

from litellm import Router

model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]

router = Router(model_list=model_list)

# openai.ChatCompletion.create replacement
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)

Available Endpoints​

  • router.completion() - chat completions endpoint to call 100+ LLMs
  • router.acompletion() - async chat completion calls
  • router.embeddings() - embedding endpoint for Azure, OpenAI, Huggingface endpoints
  • router.aembeddings() - async embeddings calls
  • router.text_completion() - completion calls in the old OpenAI /v1/completions endpoint format
  • router.atext_completion() - async text completion calls
  • router.image_generation() - completion calls in OpenAI /v1/images/generations endpoint format
  • router.aimage_generation() - async image generation calls

Advanced​

Routing Strategies - Weighted Pick, Rate Limit Aware, Least Busy, Latency Based​

Router provides 4 strategies for routing your calls across multiple deployments:

Picks the deployment with the lowest response time.

It caches, and updates the response times for deployments based on when a request was sent and received from a deployment.

How to test

from litellm import Router 
import asyncio

model_list = [{ ... }]

# init router
router = Router(model_list=model_list, routing_strategy="latency-based-routing") # 👈 set routing strategy

## CALL 1+2
tasks = []
response = None
final_response = None
for _ in range(2):
tasks.append(router.acompletion(model=model, messages=messages))
response = await asyncio.gather(*tasks)

if response is not None:
## CALL 3
await asyncio.sleep(1) # let the cache update happen
picked_deployment = router.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=router.healthy_deployments
)
final_response = await router.acompletion(model=model, messages=messages)
print(f"min deployment id: {picked_deployment}")
print(f"model id: {final_response._hidden_params['model_id']}")
assert (
final_response._hidden_params["model_id"]
== picked_deployment["model_info"]["id"]
)

Set Time Window​

Set time window for how far back to consider when averaging latency for a deployment.

In Router

router = Router(..., routing_strategy_args={"ttl": 10})

In Proxy

router_settings:
routing_strategy_args: {"ttl": 10}

Basic Reliability​

Timeouts​

The timeout set in router is for the entire length of the call, and is passed down to the completion() call level as well.

from litellm import Router 

model_list = [{...}]

router = Router(model_list=model_list,
timeout=30) # raise timeout error if call takes > 30s

print(response)

Cooldowns​

Set the limit for how many calls a model is allowed to fail in a minute, before being cooled down for a minute.

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
allowed_fails=1) # cooldown model if it fails > 1 call in a minute.

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

Retries​

For both async + sync functions, we support retrying failed requests.

For RateLimitError we implement exponential backoffs

For generic errors, we retry immediately

Here's a quick look at how we can set num_retries = 3:

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3)

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

We also support setting minimum time to wait before retrying a failed request. This is via the retry_after param.

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3, retry_after=5) # waits min 5s before retrying request

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

Fallbacks​

If a call fails after num_retries, fall back to another model group.

If the error is a context window exceeded error, fall back to a larger model group (if given).

from litellm import Router

model_list = [
{ # list of model deployments
"model_name": "azure/gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{ # list of model deployments
"model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{
"model_name": "azure/gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 1000000,
"rpm": 9000
},
{
"model_name": "gpt-3.5-turbo-16k", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-16k",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 1000000,
"rpm": 9000
}
]


router = Router(model_list=model_list,
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
set_verbose=True)


user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal fallback call
response = router.completion(model="azure/gpt-3.5-turbo", messages=messages)

# context window fallback call
response = router.completion(model="azure/gpt-3.5-turbo-context-fallback", messages=messages)

print(f"response: {response}")

Caching​

In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.

In-memory Cache

router = Router(model_list=model_list, 
cache_responses=True)

print(response)

Redis Cache

router = Router(model_list=model_list, 
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)

print(response)

Pass in Redis URL, additional kwargs

router = Router(model_list: Optional[list] = None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {}, # additional kwargs to pass to RedisCache (see caching.py)
cache_responses=True)

Caching across model groups​

If you want to cache across 2 different model groups (e.g. azure deployments, and openai), use caching groups.

import litellm, asyncio, time
from litellm import Router

# set os env
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""

async def test_acompletion_caching_on_router_caching_groups():
# tests acompletion + caching on router
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]

messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1) # add cache is async, async sleep for cache to get set
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()

asyncio.run(test_acompletion_caching_on_router_caching_groups())

Default litellm.completion/embedding params​

You can also set default params for litellm completion/embedding calls. Here's how to do that:

from litellm import Router

fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}

router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]

# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

Deploy Router​

If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server

Init Params for the litellm.Router​

def __init__(
model_list: Optional[list] = None,

## CACHING ##
redis_url: Optional[str] = None,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_password: Optional[str] = None,
cache_responses: Optional[bool] = False,
cache_kwargs: dict = {}, # additional kwargs to pass to RedisCache (see caching.py)
caching_groups: Optional[
List[tuple]
] = None, # if you want to cache across model groups
client_ttl: int = 3600, # ttl for cached clients - will re-initialize after this time in seconds

## RELIABILITY ##
num_retries: int = 0,
timeout: Optional[float] = None,
default_litellm_params={}, # default params for Router.chat.completion.create
set_verbose: bool = False,
fallbacks: List = [],
allowed_fails: Optional[int] = None,
context_window_fallbacks: List = [],
model_group_alias: Optional[dict] = {},
retry_after: int = 0, # min time to wait before retrying a failed request
routing_strategy: Literal[
"simple-shuffle",
"least-busy",
"usage-based-routing",
"latency-based-routing",
] = "simple-shuffle",
):