Source code for giphy.http
import aiohttp
from typing import Optional
import warnings
[docs]class Route:
BASE = "https://api.giphy.com/v1"
def __init__(self, endpoint: str, params: dict, method: str = "GET", **kwargs):
self.method = method
self.endpoint = endpoint
self.url = self.BASE + endpoint.format(**kwargs)
self.params = params
[docs]class HTTPClient:
def __init__(self, *, api_key: Optional[str] = None, session: Optional[aiohttp.ClientSession] = None):
self.__session = session
self._auth = api_key
[docs] async def open_session(self):
if not self.__session:
self.__session = aiohttp.ClientSession(raise_for_status = True)
else:
if not self.__session.raise_for_status:
warnings.warn("raise_for_status is not enabled on your ClientSession. No HTTP Error raising is enabled!")
[docs] async def request(self, route: Route) -> dict:
if self._auth and route.params.get("api_key") is None:
route.params.update(api_key = self._auth)
async with self.__session.request(route.method, route.url, params = route.params) as resp:
data = await resp.json()
return data
[docs] async def cleanup(self):
await self.__session.close()
self.__session = None