Supervisión de certificados SSL

Los certificados SSL vencidos siguen siendo una de las principales causas de las interrupciones del sistema, así que no se deje sorprender y controle los certificados SSL vencidos.

C05348A3-9AB8-42C9-A6E0-81DB3AC59FEB
           

¿Puede creer que en la actualidad, los certificados SSL vencidos siguen siendo una de las principales causas de interrupciones del sistema, incluso para las grandes corporaciones?

No se deje sorprender, ¡controlar la fecha de vencimiento de los certificados SSL es fácil con Python!

Esta secuencia de comandos de Python se puede utilizar para comprobar la fecha de caducidad de los certificados SSL y publicar una alerta en Slack si la fecha de caducidad está dentro de un número determinado de días. También se envía un mensaje a Slack si todos los certificados están bien, solo para informarle que el script se ejecutó.

Los archivos también se pueden encontrar en este repositorio de GitHub .

Para instalar las dependencias, utilice:

instalación de pip3 --certificación de actualización
 pip3 instalar --actualizar urllib3
 pip3 instalar --actualizar PyYAML
 instalación de pip3 --solicitudes de actualización

La lista de servidores y puertos que se comprobarán se puede configurar editando el archivo ssl.yaml . Se pueden proporcionar varios números de puerto para cada host. El token de la API de Slack debe pasarse al script como la variable de entorno slack_api .

www.bing.com:
  - 443
www.google.com:
  - 443

La secuencia de comandos tiene subprocesos múltiples para el rendimiento y tiene habilitados los reintentos de HTTP. Tenga en cuenta que el script NO valida la exactitud del certificado o la cadena de certificados.

De forma predeterminada, la secuencia de comandos le avisará si algún certificado SSL se establece en menos de número_de_días (el valor predeterminado es 60 días). El script también espera recibir una URL de webhook de Slack a través de la variable de entorno slack_api (no incluya la URL de Slack).

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
 
'''
For each URL and each port listed in the given configuration file, access the URL via HTTPS and retrieve the server's certificate.
Alert in Slack if the certificate is invalid or expiring soon.
 
'''
 
__author__ = "Videre Research, LLC"
__version__ = "1.0.2"
 
 
# I M P O R T S ###############################################################
 
import datetime
import os
import sys
import socket
import ssl
import yaml
import logging
import requests
import json
from threading import Thread
import certifi
 
if (sys.version_info > (3, 0)):
    from urllib.parse import urljoin
else:
    from urlparse import urljoin
 
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
 
 
# G L O B A L S ###############################################################
 
 
number_of_days = 60
 
webhookURL = None
slack_base_url = 'https://hooks.slack.com/services/'
environment_variable_name = 'slack_api'
 
slack_user = 'SSL Certificate Validation'
slack_message_title = 'SSL Certificate Check'
slack_icon = ':shield:'
 
hasErrors = False
 
filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ssl.yaml")
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("-" * 80)
logger.info("Version:  %s" % (__version__))
logger.info("Path:    %s" % (os.path.realpath(__file__)))
 
context = ssl.create_default_context()
context.options = ssl.CERT_REQUIRED
context.verify_flags &= ssl.VERIFY_CRL_CHECK_LEAF
context.verify_flags &= ssl.VERIFY_CRL_CHECK_CHAIN
context.check_hostname = True
context.load_verify_locations(cafile=certifi.where())
 
 
# C O D E #####################################################################
 
 
def time_this(original_function):
    def new_function(*args, **kwargs):
        before = datetime.datetime.now()
        x = original_function(*args, **kwargs)
        after = datetime.datetime.now()
        global logger
        logger.info('Duration: %.4fs  %s' % ((after - before).total_seconds(), original_function))
        return x
    return new_function
 
 
def requests_retry_session(
    retries=3,
    backoff_factor=2.5,
    status_forcelist=(500, 502, 504),
    session=None,
):
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
 
 
def get_environment_variable(myVariable):
    """Sets value based on a required environment variable."""
    if os.getenv(myVariable, '') == "":
        logger.error('Environment variable ' + myVariable + ' not set, exiting')
        sys.exit(1)
    if os.getenv(myVariable, '') == "Y":
        return True
    elif os.getenv(myVariable, '') == "N":
        return False
    else:
        return os.getenv(myVariable, '')
 
 
@time_this
def sendSlack(severity, body):
    """Send a given message to Slack."""
    global webhookURL
    if severity == 'Healthy':
        color = '36a64f'
        alert = ''
    elif severity == 'Minor':
        color = 'FFD700'
        alert = ''
    elif severity == 'Major':
        color = 'FF8C00'
        alert = '<!channel|channel> '
    elif severity == 'Critical':
        color = 'FF4500'
        alert = '<!channel|channel> '
    else:
        color = '2F4F4F'
        alert = ''
 
    headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
    data = {
        "username": slack_user,
        "icon_emoji": slack_icon,
        "text": alert + slack_message_title,
        "attachments": [
            {
                "fallback": body,
                "color": color,
                "text": body
            }
        ]
    }
 
    logger.info("Sending message to Slack")
    try:
        r = requests_retry_session().request(method='POST', url=urljoin(slack_base_url, webhookURL), data=json.dumps(data), headers=headers, timeout=25)
    except Exception as x:
        logger.error('Connection failed :( %s' % x.__class__.__name__)
    else:
        if r.status_code == 200:
            response = r.content
            logger.info(response)
        else:
            logger.error('Error: %s' % r.status_code)
            logger.debug(r.headers)
            logger.debug(r.text)
            logger.debug(sys.exc_info()[:2])
    finally:
        logger.info('Sent to Slack')
 
 
def get_issuer(ssl_info):
    try:
        commonname = ""
        orgname = ""
        domain = []
        for entry in ssl_info['issuer']:
            if entry[0][0] == 'organizationName':
                orgname = entry[0][1]
            if entry[0][0] == 'commonName':
                commonname = entry[0][1]
            if entry[0][0] == 'domainComponent':
                domain.append(entry[0][1])
 
        return f'{commonname}, {orgname}'  # , {".".join(domain)}'
    except:
        return str(ssl_info['issuer'])
 
 
def check_certificate(hostname, port):
    ssl_date_fmt = r'%b %d %H:%M:%S %Y %Z'
    global hasErrors
    global context
    conn = context.wrap_socket(
        socket.socket(socket.AF_INET),
        server_hostname=hostname,
    )
    conn.settimeout(10.0)
 
    slack_body = f"SSL Certificate: https://{hostname}:{port}\n"
    try:
        conn.connect((hostname, port))
        ssl_info = conn.getpeercert()
 
        # parse the string from the certificate into a Python datetime object
        logger.info(f"TLS Version: {conn.version()}")
        logger.info(f"TLS Cipher: {conn.cipher()[0]}")
        logger.info(f"Certificate Issuer: {get_issuer(ssl_info)}")
        expires = datetime.datetime.strptime(ssl_info['notAfter'], ssl_date_fmt)
        logger.info(f"SSL cert for {hostname} expires on {expires.isoformat()}")
        remaining = expires - datetime.datetime.utcnow()
        logger.info(remaining)
        if remaining < datetime.timedelta(days=0):
            logger.info(f"Cert expired {remaining.days} days ago")
            slack_body += f"Cert expired {remaining.days} days ago"
            sendSlack('Critical', slack_body)
            hasErrors = True
        elif remaining < datetime.timedelta(days=number_of_days):
            logger.info(f"Cert expiring in {remaining.days} days")
            slack_body += f"Cert expiring in {remaining.days} days"
            sendSlack('Minor', slack_body)
            hasErrors = True
        else:
            logger.info("Cert OK")
    except Exception as e:
        logger.error("Error {0}".format(str(e)))
        slack_body += "Error {0}".format(str(e))
        sendSlack('Major', slack_body)
        hasErrors = True
    finally:
        conn.close()
        logger.info('-'*30)
 
 
def main():
    """Main function."""
    global webhookURL
 
    webhookURL = get_environment_variable(environment_variable_name)
 
    if os.path.isfile(filepath):
        try:
            result = yaml.load(open(filepath), yaml.SafeLoader)
        except Exception as e:
            logger.error('YAML Error: %s' % e)
            sys.exit(1)
    else:
        logger.error('File not found: %s' % filepath)
        sys.exit(1)
 
    logger.info(result)
    for server, ports in result.items():
        for port in ports:
            logger.info('Checking certificate on server %s port %s' % (server, port))
            worker = Thread(target=check_certificate(server, port, ))
            worker.setDaemon(True)
            worker.start()
 
    worker.join()
 
    if not hasErrors:
        sendSlack('Healthy', 'All certificates checked are OK')
 
    logger.info('*** DONE ***')
    sys.exit(0)
 
###############################################################################
 
 
if __name__ == "__main__":
    main()
 
# E N D   O F   F I L E #######################################################

Salida de muestra:

python3 check_certificate_expiration.py
2022-07-22 17:25:53,751 - INFO - --------------------------------------------------------------------------------
2022-07-22 17:25:53,751 - INFO - Version:  1.0.2
2022-07-22 17:25:53,751 - INFO - Path:    /Users/me/SSL-Certificate-Expiration-Check/check_certificate_expiration.py
2022-07-22 17:25:53,767 - INFO - {'www.bing.com': [443], 'www.google.com': [443]}
2022-07-22 17:25:53,767 - INFO - Checking certificate on server www.bing.com port 443
2022-07-22 17:25:53,872 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,872 - INFO - TLS Cipher: ECDHE-RSA-AES256-GCM-SHA384
2022-07-22 17:25:53,872 - INFO - Certificate Issuer: Microsoft RSA TLS CA 01, Microsoft Corporation
2022-07-22 17:25:53,883 - INFO - SSL cert for www.bing.com expires on 2022-12-10T01:15:41
2022-07-22 17:25:53,883 - INFO - 140 days, 3:49:47.116068
2022-07-22 17:25:53,884 - INFO - Cert OK
2022-07-22 17:25:53,884 - INFO - ------------------------------
2022-07-22 17:25:53,884 - INFO - Checking certificate on server www.google.com port 443
2022-07-22 17:25:53,982 - INFO - TLS Version: TLSv1.2
2022-07-22 17:25:53,982 - INFO - TLS Cipher: ECDHE-ECDSA-CHACHA20-POLY1305
2022-07-22 17:25:53,982 - INFO - Certificate Issuer: GTS CA 1C3, Google Trust Services LLC
2022-07-22 17:25:53,982 - INFO - SSL cert for www.google.com expires on 2022-09-26T08:25:17
2022-07-22 17:25:53,982 - INFO - 65 days, 10:59:23.017250
2022-07-22 17:25:53,982 - INFO - Cert OK
2022-07-22 17:25:53,983 - INFO - ------------------------------
2022-07-22 17:25:53,983 - INFO - Duration: 0.0000s  <function sendSlack at 0x10abdc7b8>
2022-07-22 17:25:53,984 - INFO - *** DONE ***
Comentarios publicados: 0

Tagged with:
ssl