Monitoring SSL Certificates

Expired SSL Certificates is still one of the leading causes of system outages, so don't get caught off guard and monitor for expiring SSL Certificates.

C05348A3-9AB8-42C9-A6E0-81DB3AC59FEB
           

Can you believe that in this day and age, expired SSL Certificates are still one of the leading causes of system outages, even for large corporations!

Don't get caught off guard, monitoring the expiration date of SSL Certificates is easy with Python!

This Python script can be used to check the expiration date of SSL certificates and post an alert in Slack if the expiration date is within a given number of days. A message is also sent to Slack if all certificates are OK, just to let you know that the script did run.

Files can also be found in this GitHub repository.

To install the dependencies, please use:

pip3 install --upgrade certifi
pip3 install --upgrade urllib3
pip3 install --upgrade PyYAML
pip3 install --upgrade requests

The list of servers and ports to be checked is configurable by editing the ssl.yaml file. Multiple port numbers can be provided for each host. The Slack API token should be passed to the script as the slack_api environment variable.

www.bing.com:
  - 443
www.google.com:
  - 443

The script is multi-threaded for performance and has HTTP retries enabled. Note that the script does NOT validate the certificate's correctness or the certificate chain.

By default, the script will alert you if any SSL certificate is set to less than number_of_days (the default value is 60 days). The script also expects to be given a Slack webhook URL via the environment variable slack_api (don't include the Slack URL).

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)

'''
For each URL and each port listed in the given configuration file, access the URL via HTTPS and retrieve the server's certificate.
Alert in Slack if the certificate is invalid or expiring soon.

'''

__author__ = "Videre Research, LLC"
__version__ = "1.0.2"


# I M P O R T S ###############################################################

import datetime
import os
import sys
import socket
import ssl
import yaml
import logging
import requests
import json
from threading import Thread
import certifi

if (sys.version_info > (3, 0)):
    from urllib.parse import urljoin
else:
    from urlparse import urljoin

from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry


# G L O B A L S ###############################################################


number_of_days = 60

webhookURL = None
slack_base_url = 'https://hooks.slack.com/services/'
environment_variable_name = 'slack_api'

slack_user = 'SSL Certificate Validation'
slack_message_title = 'SSL Certificate Check'
slack_icon = ':shield:'

hasErrors = False

filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ssl.yaml")

logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("-" * 80)
logger.info("Version:  %s" % (__version__))
logger.info("Path:    %s" % (os.path.realpath(__file__)))

context = ssl.create_default_context()
context.options = ssl.CERT_REQUIRED
context.verify_flags &= ssl.VERIFY_CRL_CHECK_LEAF
context.verify_flags &= ssl.VERIFY_CRL_CHECK_CHAIN
context.check_hostname = True
context.load_verify_locations(cafile=certifi.where())


# C O D E #####################################################################


def time_this(original_function):
    def new_function(*args, **kwargs):
        before = datetime.datetime.now()
        x = original_function(*args, **kwargs)
        after = datetime.datetime.now()
        global logger
        logger.info('Duration: %.4fs  %s' % ((after - before).total_seconds(), original_function))
        return x
    return new_function


def requests_retry_session(
    retries=3,
    backoff_factor=2.5,
    status_forcelist=(500, 502, 504),
    session=None,
):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session


def get_environment_variable(myVariable):
    """Sets value based on a required environment variable."""
    if os.getenv(myVariable, '') == "":
        logger.error('Environment variable ' + myVariable + ' not set, exiting')
        sys.exit(1)
    if os.getenv(myVariable, '') == "Y":
        return True
    elif os.getenv(myVariable, '') == "N":
        return False
    else:
        return os.getenv(myVariable, '')


@time_this
def sendSlack(severity, body):
    """Send a given message to Slack."""
    global webhookURL
    if severity == 'Healthy':
        color = '36a64f'
        alert = ''
    elif severity == 'Minor':
        color = 'FFD700'
        alert = ''
    elif severity == 'Major':
        color = 'FF8C00'
        alert = '<!channel|channel> '
    elif severity == 'Critical':
        color = 'FF4500'
        alert = '<!channel|channel> '
    else:
        color = '2F4F4F'
        alert = ''

    headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
    data = {
        "username": slack_user,
        "icon_emoji": slack_icon,
        "text": alert + slack_message_title,
        "attachments": [
            {
                "fallback": body,
                "color": color,
                "text": body
            }
        ]
    }

    logger.info("Sending message to Slack")
    try:
        r = requests_retry_session().request(method='POST', url=urljoin(slack_base_url, webhookURL), data=json.dumps(data), headers=headers, timeout=25)
    except Exception as x:
        logger.error('Connection failed :( %s' % x.__class__.__name__)
    else:
        if r.status_code == 200:
            response = r.content
            logger.info(response)
        else:
            logger.error('Error: %s' % r.status_code)
            logger.debug(r.headers)
            logger.debug(r.text)
            logger.debug(sys.exc_info()[:2])
    finally:
        logger.info('Sent to Slack')


def get_issuer(ssl_info):
    try:
        commonname = ""
        orgname = ""
        domain = []
        for entry in ssl_info['issuer']:
            if entry[0][0] == 'organizationName':
                orgname = entry[0][1]
            if entry[0][0] == 'commonName':
                commonname = entry[0][1]
            if entry[0][0] == 'domainComponent':
                domain.append(entry[0][1])

        return f'{commonname}, {orgname}'  # , {".".join(domain)}'
    except:
        return str(ssl_info['issuer'])


def check_certificate(hostname, port):
    ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'
    global hasErrors
    global context
    conn = context.wrap_socket(
        socket.socket(socket.AF_INET),
        server_hostname=hostname,
    )
    conn.settimeout(10.0)

    slack_body = f"SSL Certificate: https://{hostname}:{port}\n"
    try:
        conn.connect((hostname, port))
        ssl_info = conn.getpeercert()

        # parse the string from the certificate into a Python datetime object
        logger.info(f"TLS Version: {conn.version()}")
        logger.info(f"TLS Cipher: {conn.cipher()[0]}")
        logger.info(f"Certificate Issuer: {get_issuer(ssl_info)}")
        expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)
        logger.info(f"SSL cert for {hostname} expires on {expires.isoformat()}")
        remaining = expires - datetime.datetime.utcnow()
        logger.info(remaining)
        if remaining < datetime.timedelta(days=0):
            logger.info(f"Cert expired {remaining.days} days ago")
            slack_body += f"Cert expired {remaining.days} days ago"
            sendSlack('Critical', slack_body)
            hasErrors = True
        elif remaining < datetime.timedelta(days=number_of_days):
            logger.info(f"Cert expiring in {remaining.days} days")
            slack_body += f"Cert expiring in {remaining.days} days"
            sendSlack('Minor', slack_body)
            hasErrors = True
        else:
            logger.info("Cert OK")
    except Exception as e:
        logger.error("Error {0}".format(str(e)))
        slack_body += "Error {0}".format(str(e))
        sendSlack('Major', slack_body)
        hasErrors = True
    finally:
        conn.close()
        logger.info('-'*30)


def main():
    """Main function."""
    global webhookURL

    webhookURL = get_environment_variable(environment_variable_name)

    if os.path.isfile(filepath):
        try:
            result = yaml.load(open(filepath), yaml.SafeLoader)
        except Exception as e:
            logger.error('YAML Error: %s' % e)
            sys.exit(1)
    else:
        logger.error('File not found: %s' % filepath)
        sys.exit(1)

    logger.info(result)
    for server, ports in result.items():
        for port in ports:
            logger.info('Checking certificate on server %s port %s' % (server, port))
            worker = Thread(target=check_certificate(server, port, ))
            worker.setDaemon(True)
            worker.start()

    worker.join()

    if not hasErrors:
        sendSlack('Healthy', 'All certificates checked are OK')

    logger.info('*** DONE ***')
    sys.exit(0)

###############################################################################


if __name__ == "__main__":
    main()

# E N D   O F   F I L E #######################################################

Sample Output:

python3 check_certificate_expiration.py
2022-07-22 17:25:53,751 - INFO - --------------------------------------------------------------------------------
2022-07-22 17:25:53,751 - INFO - Version:  1.0.2
2022-07-22 17:25:53,751 - INFO - Path:    /Users/me/SSL-Certificate-Expiration-Check/check_certificate_expiration.py
2022-07-22 17:25:53,767 - INFO - {'www.bing.com': [443], 'www.google.com': [443]}
2022-07-22 17:25:53,767 - INFO - Checking certificate on server www.bing.com port 443
2022-07-22 17:25:53,872 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,872 - INFO - TLS Cipher: ECDHE-RSA-AES256-GCM-SHA384
2022-07-22 17:25:53,872 - INFO - Certificate Issuer: Microsoft RSA TLS CA 01, Microsoft Corporation
2022-07-22 17:25:53,883 - INFO - SSL cert for www.bing.com expires on 2022-12-10T01:15:41
2022-07-22 17:25:53,883 - INFO - 140 days, 3:49:47.116068
2022-07-22 17:25:53,884 - INFO - Cert OK
2022-07-22 17:25:53,884 - INFO - ------------------------------
2022-07-22 17:25:53,884 - INFO - Checking certificate on server www.google.com port 443
2022-07-22 17:25:53,982 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,982 - INFO - TLS Cipher: ECDHE-ECDSA-CHACHA20-POLY1305
2022-07-22 17:25:53,982 - INFO - Certificate Issuer: GTS CA 1C3, Google Trust Services LLC
2022-07-22 17:25:53,982 - INFO - SSL cert for www.google.com expires on 2022-09-26T08:25:17
2022-07-22 17:25:53,982 - INFO - 65 days, 10:59:23.017250
2022-07-22 17:25:53,982 - INFO - Cert OK
2022-07-22 17:25:53,983 - INFO - ------------------------------
2022-07-22 17:25:53,983 - INFO - Duration: 0.0000s  <function sendSlack at 0x10abdc7b8>
2022-07-22 17:25:53,984 - INFO - *** DONE ***
Posted Comments: 0

Tagged with:
ssl