
    ~h                     J    d dl mZ ddlmZ ddlmZ ddlmZ  G d de      Zy)	    )default_json_headers   )InvalidRequestError)TokenEndpoint)UnsupportedTokenTypeErrorc                   >    e Zd ZdZdZd Zd Zd Zd Zd Z	d Z
d	 Zy
)IntrospectionEndpointzImplementation of introspection endpoint which is described in
    `RFC7662`_.

    .. _RFC7662: https://tools.ietf.org/html/rfc7662
    introspectionc                     | j                  ||       | j                  |j                  d   |j                  j                  d            }|r| j	                  |||      r|S yy)aw  The protected resource calls the introspection endpoint using an HTTP
        ``POST`` request with parameters sent as
        "application/x-www-form-urlencoded" data. The protected resource sends a
        parameter representing the token along with optional parameters
        representing additional context that is known by the protected resource
        to aid the authorization server in its response.

        token
            **REQUIRED**  The string value of the token. For access tokens, this
            is the ``access_token`` value returned from the token endpoint
            defined in OAuth 2.0. For refresh tokens, this is the
            ``refresh_token`` value returned from the token endpoint as defined
            in OAuth 2.0.

        token_type_hint
            **OPTIONAL**  A hint about the type of the token submitted for
            introspection.
        tokentoken_type_hintN)check_paramsquery_tokenformgetcheck_permission)selfrequestclientr   s       `/opt/mcp/mcp-sentiment/venv/lib/python3.12/site-packages/authlib/oauth2/rfc7662/introspection.pyauthenticate_tokenz(IntrospectionEndpoint.authenticate_token   se    & 	'6*  LL!7<<#3#34E#F
 T**5&'BL C5    c                     |j                   }d|vr
t               |j                  d      }|r|| j                  vr
t	               y y )Nr   r   )r   r   r   SUPPORTED_TOKEN_TYPESr   )r   r   r   paramshints        r   r   z"IntrospectionEndpoint.check_params,   sK    & %''zz+,D : ::+-- ;4r   c                 |    | j                  |      }| j                  ||      }| j                  |      }d|t        fS )zpValidate introspection request and create the response.

        :returns: (status_code, body, headers)
           )authenticate_endpoint_clientr   create_introspection_payloadr   )r   r   r   r   bodys        r   create_endpoint_responsez.IntrospectionEndpoint.create_endpoint_response5   sH     227; ''8 007D...r   c                     |sddiS |j                         s|j                         rddiS | j                  |      }d|vrd|d<   |S )NactiveFT)
is_expired
is_revokedintrospect_token)r   r   payloads      r   r    z2IntrospectionEndpoint.create_introspection_payloadE   s[    
 e$$!1!1!3e$$''.7" $GHr   c                     t               )aU  Check if the request has permission to introspect the token. Developers
        MUST implement this method::

            def check_permission(self, token, client, request):
                # only allow a special client to introspect the token
                return client.client_id == "introspection_client"

        :return: bool
        NotImplementedError)r   r   r   r   s       r   r   z&IntrospectionEndpoint.check_permissionS   s     "##r   c                     t               )a  Get the token from database/storage by the given token string.
        Developers should implement this method::

            def query_token(self, token_string, token_type_hint):
                if token_type_hint == "access_token":
                    tok = Token.query_by_access_token(token_string)
                elif token_type_hint == "refresh_token":
                    tok = Token.query_by_refresh_token(token_string)
                else:
                    tok = Token.query_by_access_token(token_string)
                    if not tok:
                        tok = Token.query_by_refresh_token(token_string)
                return tok
        r*   )r   token_stringr   s      r   r   z!IntrospectionEndpoint.query_token_   s     "##r   c                     t               )a  Read given token and return its introspection metadata as a
        dictionary following `Section 2.2`_::

            def introspect_token(self, token):
                return {
                    "active": True,
                    "client_id": token.client_id,
                    "token_type": token.token_type,
                    "username": get_token_username(token),
                    "scope": token.get_scope(),
                    "sub": get_token_user_sub(token),
                    "aud": token.client_id,
                    "iss": "https://server.example.com/",
                    "exp": token.expires_at,
                    "iat": token.issued_at,
                }

        .. _`Section 2.2`: https://tools.ietf.org/html/rfc7662#section-2.2
        r*   )r   r   s     r   r'   z&IntrospectionEndpoint.introspect_tokenp   s    ( "##r   N)__name__
__module____qualname____doc__ENDPOINT_NAMEr   r   r"   r    r   r   r'    r   r   r	   r	      s1     $M4./ 
$$"$r   r	   N)authlib.constsr   rfc6749r   r   r   r	   r4   r   r   <module>r7      s    / ) # /|$M |$r   