Skip to content

Add Support for a JWT Leeway Parameter #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import Mock, patch
import jwt
from datetime import datetime, timezone
import time

from tests.conftest import with_jwks_mock
from workos.session import AsyncSession, Session
Expand Down Expand Up @@ -394,6 +395,168 @@ def test_refresh_success_with_aud_claim(

assert isinstance(response, RefreshWithSessionCookieSuccessResponse)

@with_jwks_mock
def test_authenticate_with_slightly_expired_jwt_fails_without_leeway(
self, session_constants, mock_user_management
):
# Create a token that's expired by 5 seconds
current_time = int(time.time())

# Create token claims with exp 5 seconds in the past
token_claims = {
**session_constants["TEST_TOKEN_CLAIMS"],
"exp": current_time - 5, # Expired by 5 seconds
"iat": current_time - 60, # Issued 60 seconds ago
}

slightly_expired_token = jwt.encode(
token_claims,
session_constants["PRIVATE_KEY"],
algorithm="RS256",
)

# Prepare sealed session data with the slightly expired token
session_data = Session.seal_data(
{"access_token": slightly_expired_token, "user": session_constants["TEST_USER"]},
session_constants["COOKIE_PASSWORD"],
)

# With default leeway=0, authentication should fail
session = Session(
user_management=mock_user_management,
client_id=session_constants["CLIENT_ID"],
session_data=session_data,
cookie_password=session_constants["COOKIE_PASSWORD"],
jwt_leeway=0,
)

response = session.authenticate()
assert response.authenticated is False
assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_JWT

@with_jwks_mock
def test_authenticate_with_slightly_expired_jwt_succeeds_with_leeway(
self, session_constants, mock_user_management
):
# Create a token that's expired by 5 seconds
current_time = int(time.time())

# Create token claims with exp 5 seconds in the past
token_claims = {
**session_constants["TEST_TOKEN_CLAIMS"],
"exp": current_time - 5, # Expired by 5 seconds
"iat": current_time - 60, # Issued 60 seconds ago
}

slightly_expired_token = jwt.encode(
token_claims,
session_constants["PRIVATE_KEY"],
algorithm="RS256",
)

# Prepare sealed session data with the slightly expired token
session_data = Session.seal_data(
{"access_token": slightly_expired_token, "user": session_constants["TEST_USER"]},
session_constants["COOKIE_PASSWORD"],
)

# With leeway=10, authentication should succeed
session = Session(
user_management=mock_user_management,
client_id=session_constants["CLIENT_ID"],
session_data=session_data,
cookie_password=session_constants["COOKIE_PASSWORD"],
jwt_leeway=10, # 10 seconds leeway
)

response = session.authenticate()
assert response.authenticated is True
assert response.session_id == session_constants["TEST_TOKEN_CLAIMS"]["sid"]

@with_jwks_mock
def test_authenticate_with_significantly_expired_jwt_fails_without_leeway(
self, session_constants, mock_user_management
):
# Create a token that's expired by 60 seconds
current_time = int(time.time())

# Create token claims with exp 60 seconds in the past
token_claims = {
**session_constants["TEST_TOKEN_CLAIMS"],
"exp": current_time - 60, # Expired by 60 seconds
"iat": current_time - 120, # Issued 120 seconds ago
}

significantly_expired_token = jwt.encode(
token_claims,
session_constants["PRIVATE_KEY"],
algorithm="RS256",
)

# Prepare sealed session data with the significantly expired token
session_data = Session.seal_data(
{
"access_token": significantly_expired_token,
"user": session_constants["TEST_USER"]
},
session_constants["COOKIE_PASSWORD"],
)

# With default leeway=0, authentication should fail
session = Session(
user_management=mock_user_management,
client_id=session_constants["CLIENT_ID"],
session_data=session_data,
cookie_password=session_constants["COOKIE_PASSWORD"],
jwt_leeway=0,
)

response = session.authenticate()
assert response.authenticated is False
assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_JWT

@with_jwks_mock
def test_authenticate_with_significantly_expired_jwt_fails_with_insufficient_leeway(
self, session_constants, mock_user_management
):
# Create a token that's expired by 60 seconds
current_time = int(time.time())

# Create token claims with exp 60 seconds in the past
token_claims = {
**session_constants["TEST_TOKEN_CLAIMS"],
"exp": current_time - 60, # Expired by 60 seconds
"iat": current_time - 120, # Issued 120 seconds ago
}

significantly_expired_token = jwt.encode(
token_claims,
session_constants["PRIVATE_KEY"],
algorithm="RS256",
)

# Prepare sealed session data with the significantly expired token
session_data = Session.seal_data(
{
"access_token": significantly_expired_token,
"user": session_constants["TEST_USER"]
},
session_constants["COOKIE_PASSWORD"],
)

# With leeway=10, authentication should still fail (not enough leeway)
session = Session(
user_management=mock_user_management,
client_id=session_constants["CLIENT_ID"],
session_data=session_data,
cookie_password=session_constants["COOKIE_PASSWORD"],
jwt_leeway=10, # 10 seconds leeway is not enough for 60 seconds expiration
)

response = session.authenticate()
assert response.authenticated is False
assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_JWT


class TestAsyncSession(SessionFixtures):
@with_jwks_mock
Expand Down
8 changes: 8 additions & 0 deletions workos/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class BaseClient(ClientConfiguration):
_base_url: str
_client_id: str
_request_timeout: int
_jwt_leeway: float

def __init__(
self,
Expand All @@ -33,6 +34,7 @@ def __init__(
client_id: Optional[str],
base_url: Optional[str] = None,
request_timeout: Optional[int] = None,
jwt_leeway: float = 0,
) -> None:
api_key = api_key or os.getenv("WORKOS_API_KEY")
if api_key is None:
Expand Down Expand Up @@ -63,6 +65,8 @@ def __init__(
if request_timeout
else int(os.getenv("WORKOS_REQUEST_TIMEOUT", DEFAULT_REQUEST_TIMEOUT))
)

self._jwt_leeway = jwt_leeway

@property
@abstractmethod
Expand Down Expand Up @@ -122,3 +126,7 @@ def client_id(self) -> str:
@property
def request_timeout(self) -> int:
return self._request_timeout

@property
def jwt_leeway(self) -> float:
return self._jwt_leeway
2 changes: 2 additions & 0 deletions workos/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ def __init__(
client_id: Optional[str] = None,
base_url: Optional[str] = None,
request_timeout: Optional[int] = None,
jwt_leeway: float = 0,
):
super().__init__(
api_key=api_key,
client_id=client_id,
base_url=base_url,
request_timeout=request_timeout,
jwt_leeway=jwt_leeway,
)
self._http_client = AsyncHTTPClient(
api_key=self._api_key,
Expand Down
2 changes: 2 additions & 0 deletions workos/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ def __init__(
client_id: Optional[str] = None,
base_url: Optional[str] = None,
request_timeout: Optional[int] = None,
jwt_leeway: float = 0,
):
super().__init__(
api_key=api_key,
client_id=client_id,
base_url=base_url,
request_timeout=request_timeout,
jwt_leeway=jwt_leeway,
)
self._http_client = SyncHTTPClient(
api_key=self._api_key,
Expand Down
11 changes: 11 additions & 0 deletions workos/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class SessionModule(Protocol):
cookie_password: str
jwks: PyJWKClient
jwk_algorithms: List[str]
jwt_leeway: float

def __init__(
self,
Expand All @@ -36,6 +37,7 @@ def __init__(
client_id: str,
session_data: str,
cookie_password: str,
jwt_leeway: float = 0,
) -> None:
# If the cookie password is not provided, throw an error
if cookie_password is None or cookie_password == "":
Expand All @@ -45,6 +47,7 @@ def __init__(
self.client_id = client_id
self.session_data = session_data
self.cookie_password = cookie_password
self.jwt_leeway = jwt_leeway

self.jwks = PyJWKClient(self.user_management.get_jwks_url())

Expand Down Expand Up @@ -89,6 +92,7 @@ def authenticate(
signing_key.key,
algorithms=self.jwk_algorithms,
options={"verify_aud": False},
leeway=self.jwt_leeway,
)

return AuthenticateWithSessionCookieSuccessResponse(
Expand Down Expand Up @@ -136,6 +140,7 @@ def _is_valid_jwt(self, token: str) -> bool:
signing_key.key,
algorithms=self.jwk_algorithms,
options={"verify_aud": False},
leeway=self.jwt_leeway,
)
return True
except jwt.exceptions.InvalidTokenError:
Expand Down Expand Up @@ -167,6 +172,7 @@ def __init__(
client_id: str,
session_data: str,
cookie_password: str,
jwt_leeway: float = 0,
) -> None:
# If the cookie password is not provided, throw an error
if cookie_password is None or cookie_password == "":
Expand All @@ -176,6 +182,7 @@ def __init__(
self.client_id = client_id
self.session_data = session_data
self.cookie_password = cookie_password
self.jwt_leeway = jwt_leeway

self.jwks = PyJWKClient(self.user_management.get_jwks_url())

Expand Down Expand Up @@ -228,6 +235,7 @@ def refresh(
signing_key.key,
algorithms=self.jwk_algorithms,
options={"verify_aud": False},
leeway=self.jwt_leeway,
)

return RefreshWithSessionCookieSuccessResponse(
Expand Down Expand Up @@ -257,6 +265,7 @@ def __init__(
client_id: str,
session_data: str,
cookie_password: str,
jwt_leeway: float = 0,
) -> None:
# If the cookie password is not provided, throw an error
if cookie_password is None or cookie_password == "":
Expand All @@ -266,6 +275,7 @@ def __init__(
self.client_id = client_id
self.session_data = session_data
self.cookie_password = cookie_password
self.jwt_leeway = jwt_leeway

self.jwks = PyJWKClient(self.user_management.get_jwks_url())

Expand Down Expand Up @@ -318,6 +328,7 @@ async def refresh(
signing_key.key,
algorithms=self.jwk_algorithms,
options={"verify_aud": False},
leeway=self.jwt_leeway,
)

return RefreshWithSessionCookieSuccessResponse(
Expand Down
2 changes: 2 additions & 0 deletions workos/user_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ def load_sealed_session(
client_id=self._http_client.client_id,
session_data=sealed_session,
cookie_password=cookie_password,
jwt_leeway=self._client_configuration.jwt_leeway,
)

def get_user(self, user_id: str) -> User:
Expand Down Expand Up @@ -1491,6 +1492,7 @@ async def load_sealed_session(
client_id=self._http_client.client_id,
session_data=sealed_session,
cookie_password=cookie_password,
jwt_leeway=self._client_configuration.jwt_leeway,
)

async def get_user(self, user_id: str) -> User:
Expand Down
Loading