|
| 1 | +from datetime import datetime, timedelta, timezone |
| 2 | +from typing import Callable, Dict, Optional, Union |
| 3 | + |
| 4 | +from office365.runtime.auth.auth_cookies import AuthCookies |
| 5 | +from office365.runtime.auth.authentication_provider import AuthenticationProvider |
| 6 | + |
| 7 | + |
| 8 | +class CookieAuthProvider(AuthenticationProvider): |
| 9 | + """Authentication provider that applies SharePoint Online browser-session cookies. |
| 10 | +
|
| 11 | + Accepts a cookie source callback or a prebuilt AuthCookies instance and sets |
| 12 | + the HTTP "Cookie" header on outgoing requests. Optionally, a TTL can be supplied |
| 13 | + to refresh cached cookies after a specified number of seconds. |
| 14 | + """ |
| 15 | + |
| 16 | + def __init__( |
| 17 | + self, |
| 18 | + cookie_source: Union[Callable[[], Dict[str, str]], AuthCookies], |
| 19 | + ttl_seconds: Optional[int] = None, |
| 20 | + ) -> None: |
| 21 | + super().__init__() |
| 22 | + self._cookie_source = cookie_source |
| 23 | + self._ttl_seconds = ttl_seconds |
| 24 | + self._cached_auth_cookies: Optional[AuthCookies] = None |
| 25 | + self._acquired_at: Optional[datetime] = None |
| 26 | + |
| 27 | + def refresh(self) -> None: |
| 28 | + """Clears the cached cookies so that the next request reacquires them.""" |
| 29 | + self._cached_auth_cookies = None |
| 30 | + self._acquired_at = None |
| 31 | + |
| 32 | + def _is_expired(self, now_utc: datetime) -> bool: |
| 33 | + if self._cached_auth_cookies is None: |
| 34 | + return True |
| 35 | + if self._ttl_seconds is None: |
| 36 | + return False |
| 37 | + if self._acquired_at is None: |
| 38 | + return True |
| 39 | + return now_utc >= self._acquired_at + timedelta(seconds=self._ttl_seconds) |
| 40 | + |
| 41 | + def _acquire_from_source(self) -> AuthCookies: |
| 42 | + source = self._cookie_source |
| 43 | + if callable(source): |
| 44 | + result = source() |
| 45 | + if isinstance(result, AuthCookies): |
| 46 | + cookies = result |
| 47 | + elif isinstance(result, dict): |
| 48 | + cookies = AuthCookies(result) |
| 49 | + else: |
| 50 | + raise ValueError( |
| 51 | + "cookie_source must return Dict[str, str] or AuthCookies" |
| 52 | + ) |
| 53 | + elif isinstance(source, AuthCookies): |
| 54 | + cookies = source |
| 55 | + else: |
| 56 | + raise ValueError( |
| 57 | + "cookie_source must be a Callable[[], Dict[str, str]] or AuthCookies" |
| 58 | + ) |
| 59 | + |
| 60 | + if not cookies.is_valid: |
| 61 | + raise ValueError("Provided cookies are not valid for SharePoint Online.") |
| 62 | + return cookies |
| 63 | + |
| 64 | + def _ensure_cookies_cached(self) -> None: |
| 65 | + now_utc = datetime.now(timezone.utc) |
| 66 | + if self._is_expired(now_utc): |
| 67 | + cookies = self._acquire_from_source() |
| 68 | + self._cached_auth_cookies = cookies |
| 69 | + self._acquired_at = now_utc |
| 70 | + |
| 71 | + def authenticate_request(self, request) -> None: |
| 72 | + """Sets the Cookie header using cached or freshly acquired cookies.""" |
| 73 | + self._ensure_cookies_cached() |
| 74 | + request.set_header("Cookie", self._cached_auth_cookies.cookie_header) |
0 commit comments