import warnings
from time import sleep
from typing import Optional
from config_wrangler.config_templates.credentials import Credentials
import bi_etl.config.notifiers_config as notifiers_config
from bi_etl.notifiers.notifier_base import NotifierBase
[docs]
class Slack(NotifierBase):
[docs]
def __init__(self, config_section: notifiers_config.SlackNotifier, *, name: Optional[str] = None):
super().__init__(name=name)
self.config_section = config_section
try:
from slack_sdk import WebClient
self.log.debug("Using slack_sdk import")
self._client_version = 3
from slack_sdk.errors import SlackApiError
except ImportError:
try:
# noinspection PyUnresolvedReferences
from slack import WebClient
self.log.debug("Using WebClient v2+ import")
self._client_version = 2
from slack.errors import SlackApiError
except ImportError:
raise ImportError("Slack notifier requires either slack-sdk or slackclient to be installed")
self.SlackApiError = SlackApiError
# We need to support putting the token in keepass/keyring
# slack_token = self.config_section.token
if self.config_section.token is None or self.config_section.token == '':
slack_token_credentials = Credentials(
user_id='slack',
password_source=self.config_section.token_source,
raw_password=self.config_section.token,
keyring_section=self.config_section.keyring_section,
# keepass_config=self.config_section.keepass_config,
keepass=self.config_section.keepass,
keepass_group=self.config_section.keepass_group,
keepass_title=self.config_section.keepass_title,
)
config_section.add_child('slack_token_credentials', slack_token_credentials)
slack_token = slack_token_credentials.get_password()
else:
slack_token = self.config_section.token
self.slack_client = WebClient(slack_token)
self.slack_channel = self.config_section.channel
self.mention = self.config_section.mention
self._status_channel = None
self._status_ts = None
if self.slack_channel is None or self.slack_channel == 'OVERRIDE_THIS_SETTING':
self.log.warning("Slack channel not set. No slack messages will be sent.")
self.slack_channel = None
def _post_message(self, text: str, link_names: bool = False):
retry = True
result_data = None
while retry:
# https://api.slack.com/methods/chat.postMessage
try:
result = self.slack_client.chat_postMessage(
channel=self.slack_channel,
text=text,
link_names=link_names,
)
retry = False
try:
result_data = result.data
if result_data is None:
self.log.error(f"API result has result.data of None {result}")
except (AttributeError, TypeError):
result_data = None
self.log.error(f"API result did not include data: {result}")
except self.SlackApiError as e:
self.log.error(e)
if e.response['error'] == 'ratelimited':
self.log.info('Waiting for slack ratelimited to clear')
sleep(1.5)
# See retry loop above
else:
raise
return result_data
def _post_update(
self,
channel: str, # Note this does not accept the friendly channel names (see send results)
ts: str,
text: str,
link_names: bool = False
):
# Updates are considered non-critical so we don't retry
try:
self.slack_client.chat_update(
channel=channel,
ts=ts,
text=text,
link_names=link_names,
)
except self.SlackApiError as e:
self.log.error(e)
if e.response['error'] == 'ratelimited':
# Updates are considered non-critical so rate limiting means we skip the update
pass
else:
raise
[docs]
def send(self, subject, message, sensitive_message=None, attachment=None, throw_exception=False):
# Clear the status timestamp so that we create a new status message below this non-status message
self._status_ts = None
if self.slack_channel is not None and self.slack_channel != '':
if subject and message:
message_to_send = f"{subject}: {message}"
else:
if message:
message_to_send = message
else:
message_to_send = subject
if self.mention:
message_to_send += ' ' + self.mention
link_names = True
else:
link_names = False
self._post_message(text=message_to_send, link_names=link_names)
else:
self.log.info(f"Slack message not sent: {message}")
[docs]
def post_status(self, status_message):
"""
Send a temporary status messages that gets overwritten with the next status message that is sent.
Parameters
----------
status_message
Returns
-------
"""
if self._client_version == 1:
warnings.warn(f"Slack client v1 not supported for post_status")
return None
if self.slack_channel is None or self.slack_channel == '':
self.log.warning(f"slack_channel = '{self.slack_channel}'")
return None
if self._status_ts is None:
result_data = self._post_message(text=status_message)
if result_data is not None:
if 'ts' in result_data and 'channel' in result_data:
self._status_ts = result_data['ts']
self._status_channel = result_data['channel']
else:
self._post_update(
channel=self._status_channel,
ts=self._status_ts,
text=status_message
)