파일 및 폴더 처리

Python에서 많은 수의 파일과 디렉토리를 재귀적으로 처리하는 빠르고 효율적인 다중 스레드 방법입니다.

C05348A3-9AB8-42C9-A6E0-81DB3AC59FEB
           

저는 한때 Linux 서버에서 매우 많은(수백만) 파일과 디렉토리를 처리하는 임무를 받았습니다. 목표는 파일을 S3 버킷에 업로드하는 것이었으며, 이는 상당한 시간이 소요되는 작업이었습니다. 이 스크립트의 첫 번째 반복은 선형이었고 스크립트는 먼저 처리해야 하는 모든 파일 및 폴더 목록을 수집한 다음 업로드 프로세스를 시작합니다. 이로 인해 스크립트가 파일 목록을 수집할 때 몇 시간이 지연되어 시간 낭비처럼 보였습니다.

결과적으로 멀티스레딩을 추가하고 대기열을 사용하여 수행할 작업 목록을 저장하여 스크립트를 더 빠르고 효율적으로 만들기로 결정했습니다. 한 스레드는 파일 시스템을 재귀적으로 구문 분석하여 처리할 파일 및 폴더 목록을 가져오고 해당 정보를 대기열에 채웁니다. 추가 스레드는 큐가 정보로 채워지기를 기다리고 필요에 따라 각 파일과 디렉토리를 처리합니다. 이렇게 하면 전체 파일 목록이 완료될 때까지 기다릴 필요 없이 파일 처리를 시작할 수 있으므로 처리 시간이 빨라집니다.

스레드 수는 시스템의 CPU 코어 수를 기반으로 합니다. 나는 괴물 Solaris 시스템에서 이 스크립트를 실행했고 처리 스레드는 여전히 대기열이 채워지기를 기다리는 데 많은 시간을 보냈지만 "일반 크기" 시스템에서는 그렇지 않아야 합니다.


이 스크립트는 파일을 오브젝트 스토리지에 업로드하는 것부터 파일 이름을 바꾸는 것, Linux 시스템에서 UID 및/또는 GID를 변경하는 것까지 다양한 용도로 사용할 수 있습니다. 가능성은 무궁무진합니다. 파일과 폴더의 이름을 바꾸는 경우 몇 가지 분명한 주의 사항이 있습니다. 상위 폴더의 이름을 바꾸면 대기열에 기록된 경로가 분명히 더 이상 유효하지 않으므로 필요에 따라 스크립트를 조정하십시오!

이 스크립트에 포함된 샘플 처리는 단순히 파일 이름에서 대시를 제거하지만 다른 필요에 맞게 쉽게 조정할 수 있습니다. 제공된(주석 처리된) 또 다른 예는 Linux 파일 시스템에서 파일 및 디렉토리의 GID 및 UID를 수정하는 것입니다. 지정된 이전 -> 새 튜플 세트를 기반으로 하므로 필요에 따라 조정하십시오.

스크립트는 이 사이트에서 제공되거나 GitHub( https://github.com/Christophe-Gauge/list_files )에서 얻을 수도 있습니다.



list_files.py
usage: list_files.py [-h] [-v] [-l] path [path ...]
 
Recursively process files and folders.
 
positional arguments:
 path the path of the folder to be processed
 
optional arguments:
 -h, --help show this help message and exit
 -v, --verbose Output debug detail.
 -l, --links Process files that are symbolic links.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
'''
Recursively gather a list of files and directories for a given path
and processes files as needed.
 
Source: https://github.com/Christophe-Gauge/list_files
'''
 
# I M P O R T S ###############################################################
 
from __future__ import print_function
from __future__ import generators
import sys
import os
from queue import Queue
import threading
import datetime
import argparse
import traceback
import getpass
import time
import multiprocessing
import signal
import logging
from logging.handlers import RotatingFileHandler
import re
 
__author__ = "Videre Research, LLC."
__version__ = "1.0.5"
__license__ = "GNU General Public License v3.0"
 
 
# G L O B A L S ###############################################################
 
 
# Format is <current uid>: <new uid>
uid_to_change = {1001: 34273, 1002: 34313}
gid_to_change = {101: 10101, 100: 10045}
# user1 UID   1001 -> 34273
# user2 UID   1002 -> 34313
# group1 GID   101 -> 10101
# group2 GUD   100 -> 10045
 
directories_to_exclude = ['.snapshot']
files_to_exclude = ['.DS_Store']
 
num_threads = multiprocessing.cpu_count()
 
file_queue = Queue()
number_of_files_processed = 0
number_of_folders_processed = 0
number_of_files_modified = 0
number_of_folders_modified = 0
# transfer_total = 0
threadLock = threading.Lock()
is_done_listing_files = False
process_file_symbolic_links = False
before = None
 
intervals = (
    ('w', 604800),  # 60 * 60 * 24 * 7
    ('d', 86400),   # 60 * 60 * 24
    ('h', 3600),    # 60 * 60
    ('m', 60),
    ('s', 1),
)
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
try:
    logFile = os.path.realpath(__file__).split('.')[0] + ".log"
    fh = RotatingFileHandler(logFile, maxBytes=(1048576 * 300), backupCount=7)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    logger.info("Log file:    %s" % logFile)
except Exception as e:
    logger.warning("Unable to log to file: %s - %s" % (logFile, e))
logger.info("-" * 80)
logger.info("Version:  %s" % (__version__))
logger.info("Path:    %s" % (os.path.realpath(__file__)))
 
 
# F U N C T I O N S ###########################################################
 
 
def display_time(seconds, granularity=2):
    """Display time with the appropriate unit."""
    result = []
    for name, count in intervals:
        value = seconds // count
        if value:
            seconds -= value * count
            if value == 1:
                name = name.rstrip('s')
            result.append("{0:.0f}{1}".format(value, name))
    return ' '.join(result[:granularity])
 
 
def handler_stop_signals(signum, frame):
    """Handles the SIGTERM signal to stop script cleanly."""
    logger.info("Received %s signal, exiting." % signum)
    sys.exit(0)
 
 
def total_seconds(dt):
    # Keep backward compatibility with Python 2.6 which doesn't have
    # this method
    if hasattr(datetime, 'total_seconds'):
        return dt.total_seconds()
    else:
        return (dt.microseconds + (dt.seconds + dt.days * 24 * 3600) * 10**6) / 10**6
 
 
def GetHumanReadable(size, precision=2):
    """Transform file size to human readable number."""
    suffixes = [' B', 'KB', 'MB', 'GB', 'TB']
    suffixIndex = 0
    while size > 1024 and suffixIndex < 4:
        suffixIndex += 1  # increment the index of the suffix
        size = size / 1024.0  # apply the division
    return "%.*f %s" % (precision, size, suffixes[suffixIndex])
 
 
def ProcessFileThread(i, q):
    """This is the worker thread function that will process files in the queue."""
    global number_of_files_processed
    global number_of_files_modified
    global number_of_folders_processed
    global number_of_folders_modified
    global is_done_listing_files
    global before
    # global transfer_total
    is_done_processing = False
    while not is_done_processing:
        if q.empty():
            if is_done_listing_files:
                is_done_processing = True
                break
            else:
                logger.debug('%s - No files to process, thread waiting' % (i))
                time.sleep(7)
        else:
            item = q.get()
            if item is None:
                pass
            else:
                # print('%s - %s files left' % (i, q.qsize()))
                if not os.path.exists(item):
                    logger.error(item + ' does not exist')
                else:
                    with threadLock:
                        if os.path.isdir(item):
                            number_of_folders_processed += 1
                        else:
                            number_of_files_processed += 1
                            # transfer_total += float(os.path.getsize(item))
                    try:
                        full_file_name = os.path.basename(item)
                        file_path = os.path.dirname(item)
                        if full_file_name in files_to_exclude:
                            logger.warning(f'File {item} is in exclude list')
 
                        # =============== IF RENAMING FILES ===============
                        # We're not renaming directories because if will result in outdated file list in the queue
                        if os.path.isdir(item):
                            continue
                        # We're not processing hidden files, you can if you want
                        if full_file_name.startswith('.'):
                            continue
 
                        name_extension = os.path.splitext(full_file_name)
                        short_file_name = name_extension[0]
                        file_extension = name_extension[1]
                        # Just removing dashes from the file name, adjust as needed
                        new_file_name = short_file_name.replace(' - ', '').strip()
                        if new_file_name != '' and short_file_name != new_file_name:
                            new_file_name += file_extension
                            logger.info(f'File renamed: {item} -> {new_file_name}')
                            os.rename(item, os.path.join(file_path, new_file_name))
                            with threadLock:
                                number_of_files_modified += 1
 
                        # =============== IF CHANGING FILE OWNERSHIP ===============
                        # need_to_be_changed = False
                        # if os.path.islink(item):
                        #     st = os.lstat(item)
                        # else:
                        #     st = os.stat(item)
                        # logger.debug("{0:<2d} - mode: {1:<10} uid: {2:<10} gid: {3:<10} {4:<35}".format(i, str(oct(st.st_mode))[-4:], st.st_uid, st.st_gid, item))
                        # if st.st_uid in uid_to_change:
                        #     need_to_be_changed = True
                        #     new_uid = uid_to_change[st.st_uid]
                        #     logger.debug('%s - UID of %s will be changed to %s' % (i, st.st_uid, new_uid))
                        # else:
                        #     new_uid = st.st_uid
                        # if st.st_gid in gid_to_change:
                        #     need_to_be_changed = True
                        #     new_gid = gid_to_change[st.st_gid]
                        #     logger.debug('%s GID of %s will be changed to %s' % (i, st.st_gid, new_gid))
                        # else:
                        #     new_gid = st.st_gid
                        # if need_to_be_changed:
                        #     if os.path.islink(item):
                        #         os.lchown(item, new_uid, new_gid)
                        #         os.chown(item, new_uid, new_gid)
                        #     else:
                        #         os.chown(item, new_uid, new_gid)
                        #     # st = os.stat(item)
                        #     # if st.st_uid == new_uid and st.st_gid == new_gid:
                        #     logger.debug('%s - %s changed UID: %s -> %s  GID: %s -> %s' % (i, item, st.st_uid, new_uid, st.st_gid, new_gid))
                        #     # else:
                        #     #     logger.error('%s - %s failed to change UID: %s - %s  GID: %s - %s' % (i, item, st.st_uid, new_uid, st.st_gid, new_gid))
                        #     with threadLock:
                        #         if os.path.isdir(item):
                        #             number_of_folders_modified += 1
                        #         else:
                        #             number_of_files_modified += 1
                        after3 = datetime.datetime.now()
                        sys.stdout.write("\rProcessed {0:,} files in {1:,} directories, {2:,} files modified in {3}.".format(number_of_files_processed, number_of_folders_processed, number_of_files_modified, display_time(total_seconds(after3 - before))))
                        sys.stdout.flush()
                    except Exception as e:
                        logger.error('%s - failed to change: %s' % (i, item))
                        logger.error("Error {0}".format(str(e)))
                        logger.error(traceback.format_exc())
                    q.task_done()
    logger.info('%s - Thread DONE' % i)
    sys.exit(0)
 
 
def dirlist(q, base_path):
    """Add files and folders to the queue."""
    global is_done_listing_files
    before2 = datetime.datetime.now()
    for elem in dirwalk(base_path):
        # logger.debug(elem)
        q.put(elem)
    logger.info('Thread 0: DONE gathering list of files')
    after2 = datetime.datetime.now()
    logger.info('Duration of dirlist %s' % display_time(total_seconds(after2 - before2)))
    is_done_listing_files = True
    sys.exit(0)
 
 
def dirwalk(dir):
    """Recursively walk a directory tree, using a generator. Don't process directories that are links."""
    global process_file_symbolic_links
    for f in os.listdir(dir):
        fullpath = os.path.join(dir, f)
        if not os.path.islink(fullpath) or (os.path.islink(fullpath) and process_file_symbolic_links):
            yield fullpath
            if os.path.isdir(fullpath):
                if os.path.basename(fullpath) in directories_to_exclude:
                    logger.warning('Directory %s is in exclude list' % fullpath)
                elif os.path.islink(fullpath):
                    logger.warning('Skipping link directory %s' % fullpath)
                else:
                    try:
                        for x in dirwalk(fullpath):  # recurse into subdir
                            yield x
                    except Exception as e:
                        logger.error("Error {0}".format(str(e)))
                        logger.error(traceback.format_exc())
        else:
            logger.warning('Skipping link %s' % fullpath)
 
 
def main():
    """Main function."""
    global number_of_files_processed
    global number_of_files_modified
    global number_of_folders_processed
    global number_of_folders_modified
    global before
    global process_file_symbolic_links
    signal.signal(signal.SIGINT, handler_stop_signals)
    signal.signal(signal.SIGTERM, handler_stop_signals)
 
    parser = argparse.ArgumentParser(description='Recursively process files and folders.')
    parser.add_argument('directory_path', metavar='path', type=str, nargs='+',
                        help='the path of the folder to be processed')
 
    parser.add_argument(
        '-v', '--verbose', action='store_true',
        required=False, default=False,
        help='Output debug detail.')
 
    parser.add_argument(
        '-l', '--links', action='store_true',
        required=False, default=False,
        help='Process files that are symbolic links.')
 
    args = parser.parse_args()
 
    if args.verbose:
        logger.info('Verbose option passed, will show debug output')
        logger.setLevel(logging.DEBUG)
        ch.setLevel(logging.DEBUG)
    if args.links:
        process_file_symbolic_links = True
 
    base_path = os.path.abspath(args.directory_path[0])
    if not os.path.exists(base_path):
        logger.error('%s does not exist' % (base_path))
        sys.exit(1)
 
    if not os.path.isdir(base_path):
        logger.error('%s is NOT a directory' % (base_path))
        sys.exit(1)
 
    logger.info('Recursively processing files in %s' % (base_path))
    logger.info('Will create %s threads' % (num_threads))
    # logger.info('UID change list: %s' % (str(uid_to_change)))
    # logger.info('GID change list: %s' % (str(gid_to_change)))
    try:
        user = os.getlogin()
    except OSError as e:
        user = 'nobody'
    except Exception as e:
        user = 'unknown'
        logger.error("Error {0}".format(str(e)))
        logger.error(traceback.format_exc())
    if user != getpass.getuser():
        user = "%s as %s" % (user, getpass.getuser())
    logger.info("User:    %s\n" % (user))
    before = datetime.datetime.now()
 
    worker = threading.Thread(target=dirlist, args=(file_queue, base_path,))
    worker.setDaemon(True)
    worker.start()
 
    time.sleep(1)
 
    for i in range(num_threads):
        worker = threading.Thread(target=ProcessFileThread, args=(i + 1, file_queue,))
        worker.setDaemon(True)
        worker.start()
 
    logger.info('*** Main thread waiting')
    worker.join()
    logger.info('*** Main thread Done')
    time.sleep(1)
    after = datetime.datetime.now()
    logger.info('Duration  %s' % display_time(total_seconds(after - before)))
 
    logger.info('Processed {0:,} files'.format(number_of_files_processed))
    logger.info('Processed {0:,} directories'.format(number_of_folders_processed))
    logger.info('Modified  {0:,} files'.format(number_of_files_modified))
    logger.info('Modified  {0:,} directories'.format(number_of_folders_modified))
    # logger.info('%s' % GetHumanReadable(transfer_total))
 
    sys.stdout.flush()
    sys.stdout.close()
 
    sys.stderr.flush()
    sys.stderr.close()
    sys.exit(0)
 
 
###############################################################################
 
if __name__ == "__main__":
    main()
 
# E N D   O F   F I L E #######################################################
댓글을 게시했습니다: 0