import json
import logging
import random
import time
from typing import Callable, Optional, Union, cast
from requests import Response, Session
from requests.exceptions import ConnectionError
from jira.exceptions import JIRAError
logging.getLogger("jira").addHandler(logging.NullHandler())
[docs]def raise_on_error(r: Optional[Response], verb="???", **kwargs):
"""Handle errors from a Jira Request
Args:
r (Optional[Response]): Response from Jira request
verb (Optional[str]): Request type, e.g. POST. Defaults to "???".
Raises:
JIRAError: If Response is None
JIRAError: for unhandled 400 status codes.
JIRAError: for unhandled 200 status codes.
"""
request = kwargs.get("request", None)
# headers = kwargs.get('headers', None)
if r is None:
raise JIRAError(None, **kwargs)
if r.status_code >= 400:
error = ""
if r.status_code == 403 and "x-authentication-denied-reason" in r.headers:
error = r.headers["x-authentication-denied-reason"]
elif r.text:
try:
response = json.loads(r.text)
if "message" in response:
# Jira 5.1 errors
error = response["message"]
elif "errorMessages" in response and len(response["errorMessages"]) > 0:
# Jira 5.0.x error messages sometimes come wrapped in this array
# Sometimes this is present but empty
errorMessages = response["errorMessages"]
if isinstance(errorMessages, (list, tuple)):
error = errorMessages[0]
else:
error = errorMessages
# Catching only 'errors' that are dict. See https://github.com/pycontribs/jira/issues/350
elif (
"errors" in response
and len(response["errors"]) > 0
and isinstance(response["errors"], dict)
):
# Jira 6.x error messages are found in this array.
error_list = response["errors"].values()
error = ", ".join(error_list)
else:
error = r.text
except ValueError:
error = r.text
raise JIRAError(
error,
status_code=r.status_code,
url=r.url,
request=request,
response=r,
**kwargs,
)
# for debugging weird errors on CI
if r.status_code not in [200, 201, 202, 204]:
raise JIRAError(
status_code=r.status_code, request=request, response=r, **kwargs
)
# testing for the bug exposed on
# https://answers.atlassian.com/questions/11457054/answers/11975162
if (
r.status_code == 200
and len(r.content) == 0
and "X-Seraph-LoginReason" in r.headers
and "AUTHENTICATED_FAILED" in r.headers["X-Seraph-LoginReason"]
):
pass
[docs]class ResilientSession(Session):
"""This class is supposed to retry requests that do return temporary errors.
At this moment it supports: 502, 503, 504
"""
[docs] def __init__(self, timeout=None):
self.max_retries = 3
self.max_retry_delay = 60
self.timeout = timeout
super().__init__()
# Indicate our preference for JSON to avoid https://bitbucket.org/bspeakmon/jira-python/issue/46 and https://jira.atlassian.com/browse/JRA-38551
self.headers.update({"Accept": "application/json,*.*;q=0.9"})
def __recoverable(
self,
response: Optional[Union[ConnectionError, Response]],
url: str,
request,
counter: int = 1,
):
msg = str(response)
if isinstance(response, ConnectionError):
logging.warning(
f"Got ConnectionError [{response}] errno:{response.errno} on {request} {url}\n{vars(response)}\n{response.__dict__}"
)
if isinstance(response, Response):
if response.status_code in [502, 503, 504, 401]:
# 401 UNAUTHORIZED still randomly returned by Atlassian Cloud as of 2017-01-16
msg = f"{response.status_code} {response.reason}"
# 2019-07-25: Disabled recovery for codes above^
return False
elif not (
response.status_code == 200
and len(response.content) == 0
and "X-Seraph-LoginReason" in response.headers
and "AUTHENTICATED_FAILED" in response.headers["X-Seraph-LoginReason"]
):
return False
else:
msg = "Atlassian's bug https://jira.atlassian.com/browse/JRA-41559"
# Exponential backoff with full jitter.
delay = min(self.max_retry_delay, 10 * 2 ** counter) * random.random()
logging.warning(
"Got recoverable error from %s %s, will retry [%s/%s] in %ss. Err: %s"
% (request, url, counter, self.max_retries, delay, msg)
)
if isinstance(response, Response):
logging.debug("response.headers: %s", response.headers)
logging.debug("response.body: %s", response.content)
time.sleep(delay)
return True
def __verb(
self, verb: str, url: str, retry_data: Callable = None, **kwargs
) -> Response:
d = self.headers.copy()
d.update(kwargs.get("headers", {}))
kwargs["headers"] = d
# if we pass a dictionary as the 'data' we assume we want to send json
# data
data = kwargs.get("data", {})
if isinstance(data, dict):
data = json.dumps(data)
retry_number = 0
exception = None
response = None
while retry_number <= self.max_retries:
response = None
exception = None
try:
method = getattr(super(), verb.lower())
response = method(url, timeout=self.timeout, **kwargs)
if response.status_code >= 200 and response.status_code <= 299:
return response
except ConnectionError as e:
logging.warning(f"{e} while doing {verb.upper()} {url}")
exception = e
retry_number += 1
if retry_number <= self.max_retries:
response_or_exception = response if response is not None else exception
if self.__recoverable(
response_or_exception, url, verb.upper(), retry_number
):
if retry_data:
# if data is a stream, we cannot just read again from it,
# retry_data() will give us a new stream with the data
kwargs["data"] = retry_data()
continue
else:
break
if exception is not None:
raise exception
raise_on_error(response, verb=verb, **kwargs)
# after raise_on_error, only Response objects are allowed through
response = cast(Response, response) # tell mypy only Response-like are here
return response
[docs] def get(self, url: Union[str, bytes], **kwargs) -> Response: # type: ignore
return self.__verb("GET", str(url), **kwargs)
[docs] def post(self, url: Union[str, bytes], data=None, json=None, **kwargs) -> Response: # type: ignore
return self.__verb("POST", str(url), data=data, json=json, **kwargs)
[docs] def put(self, url: Union[str, bytes], data=None, **kwargs) -> Response: # type: ignore
return self.__verb("PUT", str(url), data=data, **kwargs)
[docs] def delete(self, url: Union[str, bytes], **kwargs) -> Response: # type: ignore
return self.__verb("DELETE", str(url), **kwargs)
[docs] def head(self, url: Union[str, bytes], **kwargs) -> Response: # type: ignore
return self.__verb("HEAD", str(url), **kwargs)
[docs] def patch(self, url: Union[str, bytes], data=None, **kwargs) -> Response: # type: ignore
return self.__verb("PATCH", str(url), data=data, **kwargs)
[docs] def options(self, url: Union[str, bytes], **kwargs) -> Response: # type: ignore
return self.__verb("OPTIONS", str(url), **kwargs)