SSL 인증서 모니터링

만료된 SSL 인증서는 여전히 시스템 중단의 주요 원인 중 하나이므로 방심하지 말고 만료되는 SSL 인증서를 모니터링하십시오.

C05348A3-9AB8-42C9-A6E0-81DB3AC59FEB
           

오늘날과 같은 시대에 만료된 SSL 인증서가 대기업에서도 여전히 시스템 중단의 주요 원인 중 하나라는 것을 믿을 수 있습니까?

방심하지 마십시오. SSL 인증서의 만료 날짜를 모니터링하는 것은 Python으로 쉽습니다!

이 Python 스크립트를 사용하여 SSL 인증서의 만료 날짜를 확인하고 만료 날짜가 지정된 일수 이내인 경우 Slack에 경고를 게시할 수 있습니다. 모든 인증서가 정상이면 스크립트가 실행되었음을 알리는 메시지도 Slack으로 전송됩니다.

파일은 이 GitHub 리포지토리 에서도 찾을 수 있습니다.

종속성을 설치하려면 다음을 사용하십시오.

pip3 install --upgrade certifi
pip3 install --upgrade urllib3
pip3 install --upgrade PyYAML
pip3 install --upgrade requests

검사할 서버 및 포트 목록은 ssl.yaml 파일을 편집하여 구성할 수 있습니다. 각 호스트에 대해 여러 포트 번호를 제공할 수 있습니다. Slack API 토큰은 slack_api 환경 변수로 스크립트에 전달되어야 합니다.

www.bing.com:
  - 443
www.google.com:
  - 443

이 스크립트는 성능을 위해 다중 스레드되며 HTTP 재시도가 활성화되어 있습니다. 스크립트는 인증서의 정확성 또는 인증서 체인의 유효성을 검사하지 않습니다.

기본적으로 스크립트는 SSL 인증서가 number_of_days (기본값은 60일) 미만으로 설정된 경우 경고합니다. 또한 스크립트는 환경 변수 slack_api 를 통해 Slack 웹훅 URL이 제공될 것으로 예상합니다(Slack URL은 포함하지 않음).

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
 
'''
For each URL and each port listed in the given configuration file, access the URL via HTTPS and retrieve the server's certificate.
Alert in Slack if the certificate is invalid or expiring soon.
 
'''
 
__author__ = "Videre Research, LLC"
__version__ = "1.0.2"
 
 
# I M P O R T S ###############################################################
 
import datetime
import os
import sys
import socket
import ssl
import yaml
import logging
import requests
import json
from threading import Thread
import certifi
 
if (sys.version_info > (3, 0)):
    from urllib.parse import urljoin
else:
    from urlparse import urljoin
 
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
 
 
# G L O B A L S ###############################################################
 
 
number_of_days = 60
 
webhookURL = None
slack_base_url = 'https://hooks.slack.com/services/'
environment_variable_name = 'slack_api'
 
slack_user = 'SSL Certificate Validation'
slack_message_title = 'SSL Certificate Check'
slack_icon = ':shield:'
 
hasErrors = False
 
filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ssl.yaml")
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("-" * 80)
logger.info("Version:  %s" % (__version__))
logger.info("Path:    %s" % (os.path.realpath(__file__)))
 
context = ssl.create_default_context()
context.options = ssl.CERT_REQUIRED
context.verify_flags &= ssl.VERIFY_CRL_CHECK_LEAF
context.verify_flags &= ssl.VERIFY_CRL_CHECK_CHAIN
context.check_hostname = True
context.load_verify_locations(cafile=certifi.where())
 
 
# C O D E #####################################################################
 
 
def time_this(original_function):
    def new_function(*args, **kwargs):
        before = datetime.datetime.now()
        x = original_function(*args, **kwargs)
        after = datetime.datetime.now()
        global logger
        logger.info('Duration: %.4fs  %s' % ((after - before).total_seconds(), original_function))
        return x
    return new_function
 
 
def requests_retry_session(
    retries=3,
    backoff_factor=2.5,
    status_forcelist=(500, 502, 504),
    session=None,
):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
 
 
def get_environment_variable(myVariable):
    """Sets value based on a required environment variable."""
    if os.getenv(myVariable, '') == "":
        logger.error('Environment variable ' + myVariable + ' not set, exiting')
        sys.exit(1)
    if os.getenv(myVariable, '') == "Y":
        return True
    elif os.getenv(myVariable, '') == "N":
        return False
    else:
        return os.getenv(myVariable, '')
 
 
@time_this
def sendSlack(severity, body):
    """Send a given message to Slack."""
    global webhookURL
    if severity == 'Healthy':
        color = '36a64f'
        alert = ''
    elif severity == 'Minor':
        color = 'FFD700'
        alert = ''
    elif severity == 'Major':
        color = 'FF8C00'
        alert = '<!channel|channel> '
    elif severity == 'Critical':
        color = 'FF4500'
        alert = '<!channel|channel> '
    else:
        color = '2F4F4F'
        alert = ''
 
    headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
    data = {
        "username": slack_user,
        "icon_emoji": slack_icon,
        "text": alert + slack_message_title,
        "attachments": [
            {
                "fallback": body,
                "color": color,
                "text": body
            }
        ]
    }
 
    logger.info("Sending message to Slack")
    try:
        r = requests_retry_session().request(method='POST', url=urljoin(slack_base_url, webhookURL), data=json.dumps(data), headers=headers, timeout=25)
    except Exception as x:
        logger.error('Connection failed :( %s' % x.__class__.__name__)
    else:
        if r.status_code == 200:
            response = r.content
            logger.info(response)
        else:
            logger.error('Error: %s' % r.status_code)
            logger.debug(r.headers)
            logger.debug(r.text)
            logger.debug(sys.exc_info()[:2])
    finally:
        logger.info('Sent to Slack')
 
 
def get_issuer(ssl_info):
    try:
        commonname = ""
        orgname = ""
        domain = []
        for entry in ssl_info['issuer']:
            if entry[0][0] == 'organizationName':
                orgname = entry[0][1]
            if entry[0][0] == 'commonName':
                commonname = entry[0][1]
            if entry[0][0] == 'domainComponent':
                domain.append(entry[0][1])
 
        return f'{commonname}, {orgname}'  # , {".".join(domain)}'
    except:
        return str(ssl_info['issuer'])
 
 
def check_certificate(hostname, port):
    ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'
    global hasErrors
    global context
    conn = context.wrap_socket(
        socket.socket(socket.AF_INET),
        server_hostname=hostname,
    )
    conn.settimeout(10.0)
 
    slack_body = f"SSL Certificate: https://{hostname}:{port}\n"
    try:
        conn.connect((hostname, port))
        ssl_info = conn.getpeercert()
 
        # parse the string from the certificate into a Python datetime object
        logger.info(f"TLS Version: {conn.version()}")
        logger.info(f"TLS Cipher: {conn.cipher()[0]}")
        logger.info(f"Certificate Issuer: {get_issuer(ssl_info)}")
        expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)
        logger.info(f"SSL cert for {hostname} expires on {expires.isoformat()}")
        remaining = expires - datetime.datetime.utcnow()
        logger.info(remaining)
        if remaining < datetime.timedelta(days=0):
            logger.info(f"Cert expired {remaining.days} days ago")
            slack_body += f"Cert expired {remaining.days} days ago"
            sendSlack('Critical', slack_body)
            hasErrors = True
        elif remaining < datetime.timedelta(days=number_of_days):
            logger.info(f"Cert expiring in {remaining.days} days")
            slack_body += f"Cert expiring in {remaining.days} days"
            sendSlack('Minor', slack_body)
            hasErrors = True
        else:
            logger.info("Cert OK")
    except Exception as e:
        logger.error("Error {0}".format(str(e)))
        slack_body += "Error {0}".format(str(e))
        sendSlack('Major', slack_body)
        hasErrors = True
    finally:
        conn.close()
        logger.info('-'*30)
 
 
def main():
    """Main function."""
    global webhookURL
 
    webhookURL = get_environment_variable(environment_variable_name)
 
    if os.path.isfile(filepath):
        try:
            result = yaml.load(open(filepath), yaml.SafeLoader)
        except Exception as e:
            logger.error('YAML Error: %s' % e)
            sys.exit(1)
    else:
        logger.error('File not found: %s' % filepath)
        sys.exit(1)
 
    logger.info(result)
    for server, ports in result.items():
        for port in ports:
            logger.info('Checking certificate on server %s port %s' % (server, port))
            worker = Thread(target=check_certificate(server, port, ))
            worker.setDaemon(True)
            worker.start()
 
    worker.join()
 
    if not hasErrors:
        sendSlack('Healthy', 'All certificates checked are OK')
 
    logger.info('*** DONE ***')
    sys.exit(0)
 
###############################################################################
 
 
if __name__ == "__main__":
    main()
 
# E N D   O F   F I L E #######################################################

샘플 출력:

python3 check_certificate_expiration.py
2022-07-22 17:25:53,751 - INFO - --------------------------------------------------------------------------------
2022-07-22 17:25:53,751 - INFO - Version:  1.0.2
2022-07-22 17:25:53,751 - INFO - Path:    /Users/me/SSL-Certificate-Expiration-Check/check_certificate_expiration.py
2022-07-22 17:25:53,767 - INFO - {'www.bing.com': [443], 'www.google.com': [443]}
2022-07-22 17:25:53,767 - INFO - Checking certificate on server www.bing.com port 443
2022-07-22 17:25:53,872 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,872 - INFO - TLS Cipher: ECDHE-RSA-AES256-GCM-SHA384
2022-07-22 17:25:53,872 - INFO - Certificate Issuer: Microsoft RSA TLS CA 01, Microsoft Corporation
2022-07-22 17:25:53,883 - INFO - SSL cert for www.bing.com expires on 2022-12-10T01:15:41
2022-07-22 17:25:53,883 - INFO - 140 days, 3:49:47.116068
2022-07-22 17:25:53,884 - INFO - Cert OK
2022-07-22 17:25:53,884 - INFO - ------------------------------
2022-07-22 17:25:53,884 - INFO - Checking certificate on server www.google.com port 443
2022-07-22 17:25:53,982 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,982 - INFO - TLS Cipher: ECDHE-ECDSA-CHACHA20-POLY1305
2022-07-22 17:25:53,982 - INFO - Certificate Issuer: GTS CA 1C3, Google Trust Services LLC
2022-07-22 17:25:53,982 - INFO - SSL cert for www.google.com expires on 2022-09-26T08:25:17
2022-07-22 17:25:53,982 - INFO - 65 days, 10:59:23.017250
2022-07-22 17:25:53,982 - INFO - Cert OK
2022-07-22 17:25:53,983 - INFO - ------------------------------
2022-07-22 17:25:53,983 - INFO - Duration: 0.0000s  <function sendSlack at 0x10abdc7b8>
2022-07-22 17:25:53,984 - INFO - *** DONE ***
댓글을 게시했습니다: 0

Tagged with:
ssl