"""
Grants are the heart of OAuth 2.0. Each Grant defines one way for a client to
retrieve an authorization. They are defined in
`Section 4 <http://tools.ietf.org/html/rfc6749#section-4>`_ of the OAuth 2.0
spec.
OAuth 2.0 comes in two flavours of how an access token is issued:
two-legged and three-legged auth. To avoid confusion they are explained in
short here.
Three-legged OAuth
------------------
The "three" symbolizes the parties that are involved:
* The client that wants to access a resource on behalf of the user.
* The user who grants access to her resources.
* The server that issues the access token if the user allows it.
Two-legged OAuth
----------------
The two-legged OAuth process differs from the three-legged process by one
missing participant. The user cannot allow or deny access.
So there are two remaining parties:
* The client that wants to access a resource.
* The server that issues the access.
"""
from oauth2.error import OAuthInvalidError, UserNotAuthenticated, \
AccessTokenNotFound, UserIdentifierMissingError, RedirectUriUnknown, \
AuthCodeNotFound, InvalidSiteAdapter
from oauth2.compatibility import urlencode, quote
import json
import time
from oauth2.datatype import AuthorizationCode, AccessToken
from oauth2.web import Response, AuthorizationCodeGrantSiteAdapter, \
ImplicitGrantSiteAdapter, ResourceOwnerGrantSiteAdapter
def encode_scopes(scopes, use_quote=False):
"""
Creates a string out of a list of scopes.
:param scopes: A list of scopes
:param use_quote: Boolean flag indicating whether the string should be quoted
:return: Scopes as a string
"""
scopes_as_string = Scope.separator.join(scopes)
if use_quote:
return quote(scopes_as_string)
return scopes_as_string
def json_error_response(error, response, status_code=400):
"""
Formats an error as a response containing a JSON body.
"""
msg = {"error": error.error, "error_description": error.explanation}
response.status_code = status_code
response.add_header("Content-Type", "application/json")
response.body = json.dumps(msg)
return response
def json_success_response(data, response):
"""
Formats the response of a successful token request as JSON.
Also adds default headers and status code.
"""
response.body = json.dumps(data)
response.status_code = 200
response.add_header("Content-Type", "application/json")
response.add_header("Cache-Control", "no-store")
response.add_header("Pragma", "no-cache")
class ResponseTypeGrant(object):
def error_response(self, response):
pass
[docs]class Scope(object):
"""
Handling of the "scope" parameter in a request.
If ``available`` and ``default`` are both ``None``, the "scope" parameter
is ignored (the default).
:param available: A list of strings each defining one supported scope.
:param default: Value to fall back to in case no scope is present in a
request.
"""
separator = " "
def __init__(self, available=None, default=None):
self.scopes = []
self.send_back = False
if isinstance(available, list):
self.available_scopes = available
else:
self.available_scopes = []
self.default = default
def compare(self, previous_scopes):
"""
Compares the scopes read from request with previously issued scopes.
:param previous_scopes: A list of scopes.
:return: ``True``
"""
for scope in self.scopes:
if scope not in previous_scopes:
raise OAuthInvalidError(
error="invalid_scope",
explanation="Invalid scope parameter in request")
return True
[docs] def parse(self, request, source):
"""
Parses scope value in given request.
Expects the value of the "scope" parameter in request to be a string
where each requested scope is separated by a white space::
# One scope requested
"profile_read"
# Multiple scopes
"profile_read profile_write"
:param request: An instance of :class:`oauth2.web.Request`.
:param source: Where to read the scope from. Pass "body" in case of a
application/x-www-form-urlencoded body and "query" in
case the scope is supplied as a query parameter in the
URL of a request.
"""
if source == "body":
req_scope = request.post_param("scope")
elif source == "query":
req_scope = request.get_param("scope")
else:
raise ValueError("Unknown scope source '" + source + "'")
if req_scope is None:
if self.default is not None:
self.scopes = [self.default]
self.send_back = True
return
elif len(self.available_scopes) != 0:
raise OAuthInvalidError(
error="invalid_scope",
explanation="Missing scope parameter in request")
else:
return
req_scopes = req_scope.split(self.separator)
self.scopes = [scope for scope in req_scopes
if scope in self.available_scopes]
if len(self.scopes) == 0 and self.default is not None:
self.scopes = [self.default]
self.send_back = True
[docs]class ScopeGrant(object):
"""
Handling of scopes in the OAuth 2.0 flow.
Inherited by all grants that need to support scopes.
:param default_scope: The scope identifier that is returned by default.
(optional)
:param scopes: A list of strings identifying the scopes that the
grant supports.
:param scope_class: The class that does the actual handling in a request.
Default: :class:`oauth2.grant.Scope`.
"""
def __init__(self, default_scope=None, scopes=None, scope_class=Scope,
**kwargs):
self.default_scope = default_scope
self.scopes = scopes
self.scope_class = scope_class
super(ScopeGrant, self).__init__(**kwargs)
def _create_scope_handler(self):
return self.scope_class(available=self.scopes,
default=self.default_scope)
class GrantHandler(object):
"""
Base class every oauth2 handler can extend.
"""
def process(self, request, response, environ):
"""
Handles the logic of how a user gets an access token.
This includes steps like calling the implementation of a `SiteAdapter`
if the user is authorized or generating a new access token.
This method uses data read in `read_validate_params`.
"""
raise NotImplementedError
def read_validate_params(self, request):
"""
Reads and validates the incoming data.
"""
raise NotImplementedError
def handle_error(self, error, response):
"""
Takes all the actions necessary to return an error response in the
format defined for a specific grant handler.
"""
raise NotImplementedError
[docs]class GrantHandlerFactory(object):
"""
Base class every handler factory can extend.
This class defines the basic interface of each Grant.
"""
def __call__(self, request, server):
raise NotImplementedError
class AuthRequestMixin(object):
"""
Generalization of reading and validating an incoming request used by
`oauth2.grant.AuthorizationCodeAuthHandler` and
`oauth2.grant.ImplicitGrantHandler`.
"""
def __init__(self, client_authenticator, scope_handler, token_generator,
**kwargs):
self.client = None
self.state = None
self.client_authenticator = client_authenticator
self.scope_handler = scope_handler
self.token_generator = token_generator
super(AuthRequestMixin, self).__init__(**kwargs)
def read_validate_params(self, request):
"""
Reads and validates data in an incoming request as required by
the Authorization Request of the Authorization Code Grant and the
Implicit Grant.
"""
self.client = self.client_authenticator.by_identifier(request)
response_type = request.get_param("response_type")
if self.client.response_type_supported(response_type) is False:
raise OAuthInvalidError(error="unauthorized_client")
self.state = request.get_param("state")
self.scope_handler.parse(request, "query")
return True
class AuthorizeMixin(object):
"""
Used by all grants that involve user interaction.
"""
def __init__(self, site_adapter, **kwargs):
self.site_adapter = site_adapter
super(AuthorizeMixin, self).__init__(**kwargs)
def authorize(self, request, response, environ, scopes):
"""
Controls all steps to authorize a request by a user.
:param request: The incoming :class:`oauth2.web.Request`
:param response: The :class:`oauth2.web.Response` that will be
returned eventually
:param environ: The environment variables of this request
:param scopes: The scopes requested by an application
:return: A tuple containing (`dict`, user_id) or the response.
"""
if self.site_adapter.user_has_denied_access(request) is True:
raise OAuthInvalidError(error="access_denied",
explanation="Authorization denied by user")
try:
result = self.site_adapter.authenticate(request, environ, scopes,
self.client)
return self.sanitize_return_value(result)
except UserNotAuthenticated:
return self.site_adapter.render_auth_page(request, response,
environ, scopes,
self.client)
@staticmethod
def sanitize_return_value(value):
if isinstance(value, tuple) and len(value) is 2:
return value
return value, None
class AccessTokenMixin(object):
"""
Used by grants that handle refresh token and unique token.
"""
def __init__(self, access_token_store, token_generator,
unique_token=False, **kwargs):
self.access_token_store = access_token_store
self.token_generator = token_generator
self.unique_token = unique_token
super(AccessTokenMixin, self).__init__(**kwargs)
def create_token(self, client_id, data, grant_type, scopes, user_id):
if self.unique_token:
if user_id is None:
raise UserIdentifierMissingError
try:
access_token = self.access_token_store. \
fetch_existing_token_of_user(
client_id,
grant_type,
user_id)
if (access_token.scopes == scopes
and access_token.is_expired() is False):
token_data = {"access_token": access_token.token,
"token_type": "Bearer"}
if access_token.refresh_token is not None:
token_data["refresh_token"] = access_token.refresh_token
token_data["expires_in"] = access_token.expires_in
return token_data
except AccessTokenNotFound:
pass
token_data = self.token_generator.create_access_token_data(data, grant_type, user_id)
access_token = AccessToken(client_id=client_id, data=data,
grant_type=grant_type,
token=token_data["access_token"],
scopes=scopes,
user_id=user_id)
if "refresh_token" in token_data:
expires_at = int(time.time()) + token_data["expires_in"]
access_token.expires_at = expires_at
access_token.refresh_token = token_data["refresh_token"]
refresh_expires_in = self.token_generator.refresh_expires_in
refresh_expires_at = int(time.time()) + refresh_expires_in
access_token.refresh_expires_at = refresh_expires_at
self.access_token_store.save_token(access_token)
return token_data
[docs]class SiteAdapterMixin(object):
"""
Mixed in by Grant classes that require a site adapter.
A concrete class must set the class attribute ``site_adapter_class`` that
contains a reference to the site adapter class that this class expects.
"""
site_adapter_class = None
def __init__(self, site_adapter, **kwargs):
if isinstance(site_adapter, self.site_adapter_class) is False:
raise InvalidSiteAdapter(
"Site adapter must inherit from class '{0}'"
.format(self.site_adapter_class.__name__)
)
self.site_adapter = site_adapter
super(SiteAdapterMixin, self).__init__(**kwargs)
class AuthorizationCodeAuthHandler(AuthorizeMixin, AuthRequestMixin,
GrantHandler):
"""
Implementation of the first step of the Authorization Code Grant
(three-legged).
"""
token_expiration = 600
def __init__(self, auth_token_store, **kwargs):
self.auth_code_store = auth_token_store
super(AuthorizationCodeAuthHandler, self).__init__(**kwargs)
def process(self, request, response, environ):
"""
Generates a new authorization token.
A form to authorize the access of the application can be displayed with
the help of `oauth2.web.SiteAdapter`.
"""
data = self.authorize(request, response, environ,
self.scope_handler.scopes)
if isinstance(data, Response):
return data
code = self.token_generator.generate()
expires = int(time.time()) + self.token_expiration
auth_code = AuthorizationCode(client_id=self.client.identifier,
code=code, expires_at=expires,
redirect_uri=self.client.redirect_uri,
scopes=self.scope_handler.scopes,
data=data[0], user_id=data[1])
self.auth_code_store.save_code(auth_code)
response.add_header("Location", self._generate_location(code))
response.body = ""
response.status_code = 302
return response
def handle_error(self, error, response):
"""
Redirects the client in case an error in the auth process occurred.
"""
query_params = {"error": error.error}
query = urlencode(query_params)
location = "%s?%s" % (self.client.redirect_uri, query)
response.status_code = 302
response.body = ""
response.add_header("Location", location)
return response
def _generate_location(self, code):
query = "code=" + code
if self.state is not None:
query += "&state=" + quote(self.state)
return "%s?%s" % (self.client.redirect_uri, query)
class AuthorizationCodeTokenHandler(AccessTokenMixin, GrantHandler):
"""
Implementation of the second step of the Authorization Code Grant
(three-legged).
"""
def __init__(self, auth_token_store, client_authenticator, **kwargs):
self.client = None
self.code = None
self.data = {}
self.redirect_uri = None
self.scopes = []
self.user_id = None
self.auth_code_store = auth_token_store
self.client_authenticator = client_authenticator
super(AuthorizationCodeTokenHandler, self).__init__(**kwargs)
def read_validate_params(self, request):
"""
Reads and validates the data from the incoming request.
A valid request is issued via POST consists of the following form-encoded body:
client_id - Identifier of the requesting client (required)
client_secret - Secret phrase generated by the auth system (required)
code - Authorization code acquired in the Authorization Request (required)
redirect_uri - URI that the OAuth2 server should redirect to (optional)
"""
self._read_params(request)
self._validate_code()
return True
def process(self, request, response, environ):
"""
Generates a new access token and returns it.
Returns the access token and the type of the token as JSON.
Calls `oauth2.store.AccessTokenStore` to persist the token.
"""
token_data = self.create_token(
client_id=self.client.identifier,
data=self.data,
grant_type=AuthorizationCodeGrant.grant_type,
scopes=self.scopes,
user_id=self.user_id)
self.auth_code_store.delete_code(self.code)
if self.scopes:
token_data["scope"] = encode_scopes(self.scopes)
json_success_response(data=token_data, response=response)
return response
def handle_error(self, error, response):
return json_error_response(error, response)
def _read_params(self, request):
self.client = self.client_authenticator.by_identifier_secret(request)
self.code = request.post_param("code")
self.redirect_uri = request.post_param("redirect_uri")
if self.code is None or self.redirect_uri is None:
raise OAuthInvalidError(
error="invalid_request",
explanation="Missing required parameter in request")
try:
self.client.redirect_uri = self.redirect_uri
except RedirectUriUnknown:
raise OAuthInvalidError(
error="invalid_request",
explanation="Invalid redirect_uri parameter")
def _validate_code(self):
try:
stored_code = self.auth_code_store.fetch_by_code(self.code)
except AuthCodeNotFound:
raise OAuthInvalidError(
error="invalid_request",
explanation="Invalid authorization code parameter")
if stored_code.code != self.code:
raise OAuthInvalidError(
error="invalid_grant",
explanation="Invalid code parameter in request")
if stored_code.redirect_uri != self.redirect_uri:
raise OAuthInvalidError(
error="invalid_request",
explanation="Invalid redirect_uri parameter")
if stored_code.is_expired():
raise OAuthInvalidError(
error="invalid_grant",
explanation="Authorization code has expired")
self.data = stored_code.data
self.scopes = stored_code.scopes
self.user_id = stored_code.user_id
[docs]class AuthorizationCodeGrant(GrantHandlerFactory, ScopeGrant,
SiteAdapterMixin):
"""
Implementation of the Authorization Code Grant auth flow.
This is a three-legged OAuth process.
Register an instance of this class with
:class:`oauth2.AuthorizationController` like this::
auth_controller = AuthorizationController()
auth_controller.add_grant_type(AuthorizationCodeGrant())
.. versionchanged:: 1.0.0
Require parameter ``site_adapter``.
"""
grant_type = "authorization_code"
site_adapter_class = AuthorizationCodeGrantSiteAdapter
def __init__(self, unique_token=False, expires_in=0, **kwargs):
self.unique_token = unique_token
self.expires_in = expires_in
super(AuthorizationCodeGrant, self).__init__(**kwargs)
def __call__(self, request, server):
"""
:param request: Incoming request
:type request: oauth2.web.Request
:param server: The OAuth2 provider instance
.. versionchanged:: 1.0.0
Check the HTTP method of a request
"""
if (request.method == "POST"
and request.post_param("grant_type") == "authorization_code"
and request.path == server.token_path):
return AuthorizationCodeTokenHandler(
access_token_store=server.access_token_store,
auth_token_store=server.auth_code_store,
client_authenticator=server.client_authenticator,
token_generator=server.token_generator,
unique_token=self.unique_token)
if (request.method == "GET"
and request.get_param("response_type") == "code"
and request.path == server.authorize_path):
scope_handler = self._create_scope_handler()
return AuthorizationCodeAuthHandler(
auth_token_store=server.auth_code_store,
client_authenticator=server.client_authenticator,
scope_handler=scope_handler,
site_adapter=self.site_adapter,
token_generator=server.token_generator)
return None
[docs]class ImplicitGrant(GrantHandlerFactory, ScopeGrant, SiteAdapterMixin):
"""
Implementation of the Implicit Grant auth flow.
This is a three-legged OAuth process.
Register an instance of this class with
:class:`oauth2.AuthorizationController` like this::
auth_controller = AuthorizationController()
auth_controller.add_grant_type(ImplicitGrant())
.. versionchanged:: 1.0.0
Require parameter ``site_adapter``.
"""
grant_type = "implicit"
site_adapter_class = ImplicitGrantSiteAdapter
def __call__(self, request, server):
response_type = request.get_param("response_type")
if (response_type == "token"
and request.path == server.authorize_path):
return ImplicitGrantHandler(
access_token_store=server.access_token_store,
client_authenticator=server.client_authenticator,
scope_handler=self._create_scope_handler(),
site_adapter=self.site_adapter,
token_generator=server.token_generator)
return None
class ImplicitGrantHandler(AuthorizeMixin, AuthRequestMixin, GrantHandler):
def __init__(self, access_token_store, **kwargs):
self.access_token_store = access_token_store
super(ImplicitGrantHandler, self).__init__(**kwargs)
def process(self, request, response, environ):
data = self.authorize(request, response, environ,
self.scope_handler.scopes)
if isinstance(data, Response):
return data
token = self.token_generator.generate()
access_token = AccessToken(client_id=self.client.identifier,
grant_type=ImplicitGrant.grant_type,
token=token, data=data[0],
scopes=self.scope_handler.scopes)
self.access_token_store.save_token(access_token)
return self._redirect_access_token(response, token)
def handle_error(self, error, response):
redirect_location = "%s#error=%s" % (self.client.redirect_uri,
error.error)
response.add_header("Location", redirect_location)
response.body = ""
response.status_code = 302
return response
def _redirect_access_token(self, response, token):
uri_with_fragment = "{0}#access_token={1}&token_type=bearer". \
format(self.client.redirect_uri, token)
if self.state is not None:
uri_with_fragment += "&state=" + quote(self.state)
if self.scope_handler.send_back is True:
scopes_as_string = encode_scopes(self.scope_handler.scopes,
use_quote=True)
uri_with_fragment += "&scope=" + scopes_as_string
response.status_code = 302
response.add_header("Location", uri_with_fragment)
response.content = ""
return response
[docs]class ResourceOwnerGrant(GrantHandlerFactory, ScopeGrant, SiteAdapterMixin):
"""
Implementation of the Resource Owner Password Credentials Grant auth flow.
In this Grant a user provides a user name and a password.
An access token is issued if the auth server was able to verify the user
by her credentials.
Register an instance of this class with
:class:`oauth2.AuthorizationController` like this::
auth_controller = AuthorizationController()
auth_controller.add_grant_type(ResourceOwnerGrant())
.. versionchanged:: 1.0.0
Require parameter ``site_adapter``.
"""
grant_type = "password"
site_adapter_class = ResourceOwnerGrantSiteAdapter
def __init__(self, unique_token=False, expires_in=0, **kwargs):
self.unique_token = unique_token
self.expires_in = expires_in
super(ResourceOwnerGrant, self).__init__(**kwargs)
def __call__(self, request, server):
"""
Checks if the incoming request can be handled by the
ResourceOwnerGrantHandler and returns an instance of it.
"""
if request.post_param("grant_type") != self.grant_type:
return None
return ResourceOwnerGrantHandler(
access_token_store=server.access_token_store,
client_authenticator=server.client_authenticator,
scope_handler=self._create_scope_handler(),
site_adapter=self.site_adapter,
token_generator=server.token_generator,
unique_token=self.unique_token)
class ResourceOwnerGrantHandler(GrantHandler, AccessTokenMixin):
"""
Class for handling Resource Owner authorization requests.
See http://tools.ietf.org/html/rfc6749#section-4.3
"""
OWNER_NOT_AUTHENTICATED = "Unable to authenticate resource owner"
def __init__(self, client_authenticator, scope_handler, site_adapter,
**kwargs):
"""
:param client_authenticator: Client authenticator
:type client_authenticator: oauth2.client_authenticator.ClientAuthenticator
:param scope_handler: Scope handler
:type scope_handler: oauth2.grant.Scope
:param site_adapter: Site adapter
:type site_adapter: oauth2.web.SiteAdapter
"""
self.client_authenticator = client_authenticator
self.scope_handler = scope_handler
self.site_adapter = site_adapter
self.client = None
self.password = None
self.username = None
super(ResourceOwnerGrantHandler, self).__init__(**kwargs)
def process(self, request, response, environ):
"""
Takes the incoming request, asks the concrete SiteAdapter to validate
it and issues a new access token that is returned to the client on
successful validation.
"""
try:
data = self.site_adapter.authenticate(request, environ,
self.scope_handler.scopes,
self.client)
data = AuthorizeMixin.sanitize_return_value(data)
except UserNotAuthenticated:
raise OAuthInvalidError(error="invalid_client",
explanation=self.OWNER_NOT_AUTHENTICATED)
if isinstance(data, Response):
return data
token_data = self.create_token(
client_id=self.client.identifier,
data=data[0],
grant_type=ResourceOwnerGrant.grant_type,
scopes=self.scope_handler.scopes,
user_id=data[1])
if self.scope_handler.send_back:
token_data["scope"] = encode_scopes(self.scope_handler.scopes)
json_success_response(data=token_data, response=response)
return response
def read_validate_params(self, request):
"""
Checks if all incoming parameters meet the expected values.
"""
self.client = self.client_authenticator.by_identifier_secret(request)
self.password = request.post_param("password")
self.username = request.post_param("username")
self.scope_handler.parse(request=request, source="body")
return True
def handle_error(self, error, response):
status_code = 400
if error.explanation == self.OWNER_NOT_AUTHENTICATED:
status_code = 401
return json_error_response(error, response, status_code=status_code)
[docs]class RefreshToken(GrantHandlerFactory, ScopeGrant):
"""
Handles requests for refresh tokens as defined in
http://tools.ietf.org/html/rfc6749#section-6.
Adding a Refresh Token to the :class:`oauth2.AuthorizationController` like
this::
auth_controller = AuthorizationController()
auth_controller.add_grant_type(ResourceOwnerGrant(tokens_expire=600))
auth_controller.add_grant_type(RefreshToken(tokens_expire=1200))
will cause :class:`oauth2.grant.AuthorizationCodeGrant` and
:class:`oauth2.grant.ResourceOwnerGrant` to include a refresh token and
expiration in the response.
If tokens_expire == 0, the tokens will never expire.
"""
grant_type = "refresh_token"
def __init__(self, expires_in, reissue_refresh_tokens=False, **kwargs):
self.refresh_expires_in = expires_in
self.reissue_refresh_tokens = reissue_refresh_tokens
super(RefreshToken, self).__init__(**kwargs)
def __call__(self, request, server):
"""
Determines if the current request requests a refresh token.
:return: An instance of :class:`RefreshTokenHandler`.
"""
if request.path != server.token_path:
return None
if request.post_param("grant_type") != "refresh_token":
return None
return RefreshTokenHandler(
access_token_store=server.access_token_store,
client_authenticator=server.client_authenticator,
scope_handler=self._create_scope_handler(),
token_generator=server.token_generator,
reissue_refresh_tokens=self.reissue_refresh_tokens
)
class RefreshTokenHandler(GrantHandler):
"""
Validates an incoming request and issues a new access token.
"""
def __init__(self, access_token_store, client_authenticator,
scope_handler, token_generator,
reissue_refresh_tokens=False):
self.access_token_store = access_token_store
self.client_authenticator = client_authenticator
self.scope_handler = scope_handler
self.token_generator = token_generator
self.client = None
self.data = {}
self.refresh_grant_type = None
self.refresh_token = None
self.user_id = None
self.reissue_refresh_tokens = reissue_refresh_tokens
def process(self, request, response, environ):
"""
Create a new access token.
:param request: The incoming :class:`oauth2.web.Request`.
:param response: The :class:`oauth2.web.Response` that will be returned
to the client.
:param environ: A ``dict`` containing data of the environment.
:return: :class:`oauth2.web.Response`
"""
token_data = self.token_generator.create_access_token_data(self.data, self.refresh_grant_type, self.user_id)
expires_at = int(time.time()) + token_data["expires_in"]
access_token = AccessToken(client_id=self.client.identifier,
token=token_data["access_token"],
grant_type=self.refresh_grant_type,
data=self.data, expires_at=expires_at,
scopes=self.scope_handler.scopes,
user_id=self.user_id)
if self.reissue_refresh_tokens:
self.access_token_store.delete_refresh_token(self.refresh_token)
access_token.refresh_token = token_data["refresh_token"]
refresh_expires_in = self.token_generator.refresh_expires_in
refresh_expires_at = int(time.time()) + refresh_expires_in
access_token.refresh_expires_at = refresh_expires_at
else:
del token_data["refresh_token"]
self.access_token_store.save_token(access_token)
json_success_response(data=token_data, response=response)
return response
def read_validate_params(self, request):
"""
Validate the incoming request.
:param request: The incoming :class:`oauth2.web.Request`.
:return: Returns ``True`` if data is valid.
:raises: :class:`oauth2.error.OAuthInvalidError`
"""
self.refresh_token = request.post_param("refresh_token")
if self.refresh_token is None:
raise OAuthInvalidError(
error="invalid_request",
explanation="Missing refresh_token in request body")
self.client = self.client_authenticator.by_identifier_secret(request)
try:
access_token = self.access_token_store.fetch_by_refresh_token(
self.refresh_token
)
except AccessTokenNotFound:
raise OAuthInvalidError(error="invalid_request",
explanation="Invalid refresh token")
refresh_token_expires_at = access_token.refresh_expires_at
self.refresh_grant_type = access_token.grant_type
if refresh_token_expires_at != 0 and \
refresh_token_expires_at < int(time.time()):
raise OAuthInvalidError(error="invalid_request",
explanation="Invalid refresh token")
self.data = access_token.data
self.user_id = access_token.user_id
self.scope_handler.parse(request, "body")
self.scope_handler.compare(access_token.scopes)
return True
def handle_error(self, error, response):
return json_error_response(error, response)
class ClientCredentialsGrant(GrantHandlerFactory, ScopeGrant):
grant_type = "client_credentials"
def __call__(self, request, server):
if request.path != server.token_path:
return None
if request.post_param("grant_type") == self.grant_type:
return ClientCredentialsHandler(
access_token_store=server.access_token_store,
client_authenticator=server.client_authenticator,
scope_handler=self._create_scope_handler(),
token_generator=server.token_generator)
return None
class ClientCredentialsHandler(GrantHandler):
def __init__(self, access_token_store, client_authenticator,
scope_handler, token_generator):
self.access_token_store = access_token_store
self.client_authenticator = client_authenticator
self.scope_handler = scope_handler
self.token_generator = token_generator
self.client = None
def process(self, request, response, environ):
body = {"token_type": "Bearer"}
token = self.token_generator.generate()
expires_in = self.token_generator.expires_in.get(ClientCredentialsGrant.grant_type, None)
if expires_in is None:
expires_at = None
else:
expires_at = int(time.time()) + expires_in
access_token = AccessToken(
client_id=self.client.identifier,
grant_type=ClientCredentialsGrant.grant_type,
token=token,
expires_at=expires_at,
scopes=self.scope_handler.scopes)
self.access_token_store.save_token(access_token)
body["access_token"] = token
if expires_in is not None:
body["expires_in"] = expires_in
if self.scope_handler.send_back:
body["scope"] = encode_scopes(self.scope_handler.scopes)
json_success_response(data=body, response=response)
return response
def read_validate_params(self, request):
self.client = self.client_authenticator.by_identifier_secret(request)
self.scope_handler.parse(request=request, source="body")
def handle_error(self, error, response):
return json_error_response(error, response)