SSL証明書の監視

期限切れのSSL証明書は、依然としてシステム停止の主な原因の1つであるため、不意を突かれてSSL証明書の期限切れを監視しないでください。

C05348A3-9AB8-42C9-A6E0-81DB3AC59FEB
           

この時代において、期限切れのSSL証明書は、大企業であっても、システム停止の主な原因の1つであると信じられますか?

Pythonを使用すると、SSL証明書の有効期限を簡単に監視できます。

このPythonスクリプトを使用して、SSL証明書の有効期限を確認し、有効期限が指定された日数以内の場合はSlackにアラートを投稿できます。スクリプトが実行されたことを通知するために、すべての証明書に問題がない場合は、メッセージもSlackに送信されます。

ファイルは、このGitHubリポジトリにもあります。

依存関係をインストールするには、以下を使用してください。

pip3 install --upgrade certifi
pip3 install --upgrade urllib3
pip3 install --upgrade PyYAML
pip3 install --upgrade requests

チェックするサーバーとポートのリストは、 ssl.yamlファイルを編集することで構成できます。ホストごとに複数のポート番号を指定できます。 Slack APIトークンは、 slack_api環境変数としてスクリプトに渡す必要があります。

www.bing.com:
  - 443
www.google.com:
  - 443

スクリプトはパフォーマンスのためにマルチスレッド化されており、HTTP再試行が有効になっています。スクリプトは、証明書の正確性または証明書チェーンを検証しないことに注意してください。

デフォルトでは、SSL証明書がnumber_of_days未満に設定されている場合(デフォルト値は60日)、スクリプトは警告を発します。スクリプトは、環境変数slack_apiを介してSlack Webhook URLが与えられることも想定しています(Slack URLは含めないでください)。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
 
'''
For each URL and each port listed in the given configuration file, access the URL via HTTPS and retrieve the server's certificate.
Alert in Slack if the certificate is invalid or expiring soon.
 
'''
 
__author__ = "Videre Research, LLC"
__version__ = "1.0.2"
 
 
# I M P O R T S ###############################################################
 
import datetime
import os
import sys
import socket
import ssl
import yaml
import logging
import requests
import json
from threading import Thread
import certifi
 
if (sys.version_info > (3, 0)):
    from urllib.parse import urljoin
else:
    from urlparse import urljoin
 
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
 
 
# G L O B A L S ###############################################################
 
 
number_of_days = 60
 
webhookURL = None
slack_base_url = 'https://hooks.slack.com/services/'
environment_variable_name = 'slack_api'
 
slack_user = 'SSL Certificate Validation'
slack_message_title = 'SSL Certificate Check'
slack_icon = ':shield:'
 
hasErrors = False
 
filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ssl.yaml")
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("-" * 80)
logger.info("Version:  %s" % (__version__))
logger.info("Path:    %s" % (os.path.realpath(__file__)))
 
context = ssl.create_default_context()
context.options = ssl.CERT_REQUIRED
context.verify_flags &= ssl.VERIFY_CRL_CHECK_LEAF
context.verify_flags &= ssl.VERIFY_CRL_CHECK_CHAIN
context.check_hostname = True
context.load_verify_locations(cafile=certifi.where())
 
 
# C O D E #####################################################################
 
 
def time_this(original_function):
    def new_function(*args, **kwargs):
        before = datetime.datetime.now()
        x = original_function(*args, **kwargs)
        after = datetime.datetime.now()
        global logger
        logger.info('Duration: %.4fs  %s' % ((after - before).total_seconds(), original_function))
        return x
    return new_function
 
 
def requests_retry_session(
    retries=3,
    backoff_factor=2.5,
    status_forcelist=(500, 502, 504),
    session=None,
):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
 
 
def get_environment_variable(myVariable):
    """Sets value based on a required environment variable."""
    if os.getenv(myVariable, '') == "":
        logger.error('Environment variable ' + myVariable + ' not set, exiting')
        sys.exit(1)
    if os.getenv(myVariable, '') == "Y":
        return True
    elif os.getenv(myVariable, '') == "N":
        return False
    else:
        return os.getenv(myVariable, '')
 
 
@time_this
def sendSlack(severity, body):
    """Send a given message to Slack."""
    global webhookURL
    if severity == 'Healthy':
        color = '36a64f'
        alert = ''
    elif severity == 'Minor':
        color = 'FFD700'
        alert = ''
    elif severity == 'Major':
        color = 'FF8C00'
        alert = '<!channel|channel> '
    elif severity == 'Critical':
        color = 'FF4500'
        alert = '<!channel|channel> '
    else:
        color = '2F4F4F'
        alert = ''
 
    headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
    data = {
        "username": slack_user,
        "icon_emoji": slack_icon,
        "text": alert + slack_message_title,
        "attachments": [
            {
                "fallback": body,
                "color": color,
                "text": body
            }
        ]
    }
 
    logger.info("Sending message to Slack")
    try:
        r = requests_retry_session().request(method='POST', url=urljoin(slack_base_url, webhookURL), data=json.dumps(data), headers=headers, timeout=25)
    except Exception as x:
        logger.error('Connection failed :( %s' % x.__class__.__name__)
    else:
        if r.status_code == 200:
            response = r.content
            logger.info(response)
        else:
            logger.error('Error: %s' % r.status_code)
            logger.debug(r.headers)
            logger.debug(r.text)
            logger.debug(sys.exc_info()[:2])
    finally:
        logger.info('Sent to Slack')
 
 
def get_issuer(ssl_info):
    try:
        commonname = ""
        orgname = ""
        domain = []
        for entry in ssl_info['issuer']:
            if entry[0][0] == 'organizationName':
                orgname = entry[0][1]
            if entry[0][0] == 'commonName':
                commonname = entry[0][1]
            if entry[0][0] == 'domainComponent':
                domain.append(entry[0][1])
 
        return f'{commonname}, {orgname}'  # , {".".join(domain)}'
    except:
        return str(ssl_info['issuer'])
 
 
def check_certificate(hostname, port):
    ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'
    global hasErrors
    global context
    conn = context.wrap_socket(
        socket.socket(socket.AF_INET),
        server_hostname=hostname,
    )
    conn.settimeout(10.0)
 
    slack_body = f"SSL Certificate: https://{hostname}:{port}\n"
    try:
        conn.connect((hostname, port))
        ssl_info = conn.getpeercert()
 
        # parse the string from the certificate into a Python datetime object
        logger.info(f"TLS Version: {conn.version()}")
        logger.info(f"TLS Cipher: {conn.cipher()[0]}")
        logger.info(f"Certificate Issuer: {get_issuer(ssl_info)}")
        expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)
        logger.info(f"SSL cert for {hostname} expires on {expires.isoformat()}")
        remaining = expires - datetime.datetime.utcnow()
        logger.info(remaining)
        if remaining < datetime.timedelta(days=0):
            logger.info(f"Cert expired {remaining.days} days ago")
            slack_body += f"Cert expired {remaining.days} days ago"
            sendSlack('Critical', slack_body)
            hasErrors = True
        elif remaining < datetime.timedelta(days=number_of_days):
            logger.info(f"Cert expiring in {remaining.days} days")
            slack_body += f"Cert expiring in {remaining.days} days"
            sendSlack('Minor', slack_body)
            hasErrors = True
        else:
            logger.info("Cert OK")
    except Exception as e:
        logger.error("Error {0}".format(str(e)))
        slack_body += "Error {0}".format(str(e))
        sendSlack('Major', slack_body)
        hasErrors = True
    finally:
        conn.close()
        logger.info('-'*30)
 
 
def main():
    """Main function."""
    global webhookURL
 
    webhookURL = get_environment_variable(environment_variable_name)
 
    if os.path.isfile(filepath):
        try:
            result = yaml.load(open(filepath), yaml.SafeLoader)
        except Exception as e:
            logger.error('YAML Error: %s' % e)
            sys.exit(1)
    else:
        logger.error('File not found: %s' % filepath)
        sys.exit(1)
 
    logger.info(result)
    for server, ports in result.items():
        for port in ports:
            logger.info('Checking certificate on server %s port %s' % (server, port))
            worker = Thread(target=check_certificate(server, port, ))
            worker.setDaemon(True)
            worker.start()
 
    worker.join()
 
    if not hasErrors:
        sendSlack('Healthy', 'All certificates checked are OK')
 
    logger.info('*** DONE ***')
    sys.exit(0)
 
###############################################################################
 
 
if __name__ == "__main__":
    main()
 
# E N D   O F   F I L E #######################################################

サンプル出力:

python3 check_certificate_expiration.py
2022-07-22 17:25:53,751 - INFO - --------------------------------------------------------------------------------
2022-07-22 17:25:53,751 - INFO - Version:  1.0.2
2022-07-22 17:25:53,751 - INFO - Path:    /Users/me/SSL-Certificate-Expiration-Check/check_certificate_expiration.py
2022-07-22 17:25:53,767 - INFO - {'www.bing.com': [443], 'www.google.com': [443]}
2022-07-22 17:25:53,767 - INFO - Checking certificate on server www.bing.com port 443
2022-07-22 17:25:53,872 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,872 - INFO - TLS Cipher: ECDHE-RSA-AES256-GCM-SHA384
2022-07-22 17:25:53,872 - INFO - Certificate Issuer: Microsoft RSA TLS CA 01, Microsoft Corporation
2022-07-22 17:25:53,883 - INFO - SSL cert for www.bing.com expires on 2022-12-10T01:15:41
2022-07-22 17:25:53,883 - INFO - 140 days, 3:49:47.116068
2022-07-22 17:25:53,884 - INFO - Cert OK
2022-07-22 17:25:53,884 - INFO - ------------------------------
2022-07-22 17:25:53,884 - INFO - Checking certificate on server www.google.com port 443
2022-07-22 17:25:53,982 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,982 - INFO - TLS Cipher: ECDHE-ECDSA-CHACHA20-POLY1305
2022-07-22 17:25:53,982 - INFO - Certificate Issuer: GTS CA 1C3, Google Trust Services LLC
2022-07-22 17:25:53,982 - INFO - SSL cert for www.google.com expires on 2022-09-26T08:25:17
2022-07-22 17:25:53,982 - INFO - 65 days, 10:59:23.017250
2022-07-22 17:25:53,982 - INFO - Cert OK
2022-07-22 17:25:53,983 - INFO - ------------------------------
2022-07-22 17:25:53,983 - INFO - Duration: 0.0000s  <function sendSlack at 0x10abdc7b8>
2022-07-22 17:25:53,984 - INFO - *** DONE ***
投稿コメント 0

Tagged with:
ssl