• /
  • /

Оптимизация резервного копирования PostgreSQL:
анализ производительности pg_dump с разными форматами и сжатием

Предлагаем провести серию тестов, создав 21 резервную копию базы данных PostgreSQL с использованием утилиты pg_dump в различных форматах и с уровнями сжатия от 1 до 7.

После этого проверим восстановление и проведем анализ, чтобы определить наиболее эффективные подходы для типичных сценариев. Все подробности и результаты представлены ниже.
Цели тестирования
Предположим, перед нами стоит задача определить оптимальный формат резервных копий, созданных с помощью pg_dump. Оптимальность определяется как баланс между скоростью создания, временем восстановления и итоговым размером файла.

Ключевые требования:

  • Сжатие файлов перед передачей на сервер для экономии сетевых ресурсов.
  • Выходной файл должен быть единым, чтобы обеспечить совместимость с облачными хранилищами, такими как S3.
  • Процесс не должен зависеть от предварительной настройки базы, что исключает использование PgBackRest, WAL-G или pg_basebackup, делая инструмент универсальным для локальных установок, Docker-контейнеров, DBaaS и других конфигураций.
Обзор форматов и методов сжатия в pg_dump
Утилита pg_dump поддерживает четыре формата резервных копий:
Особое внимание уделяется форматам Custom и Directory благодаря их поддержке параллельных операций. Custom формирует один файл и позволяет параллельное восстановление, тогда как Directory создает папку с файлами и поддерживает как параллельное создание, так и восстановление.

Доступны следующие методы сжатия для указанных форматов:
Эти показатели рассчитаны для идеальных условий. В реальной базе часть времени уходит на специфические операции, что может снизить эффективность сжатия, особенно при обработке данных в реальном времени.

Ожидается, что Custom с gzip (как универсальный вариант) и zstd (как инновационное решение) покажут лучшие результаты. Заметим, что zstd поддерживается начиная с PostgreSQL 15.
Конфигурация PostgreSQL
Предлагается развернуть два экземпляра PostgreSQL через Docker Compose: один для создания копий, другой для их восстановления. Используются нестандартные порты (7000 и 7001), чтобы избежать конфликтов с существующими установками.
version: "3.8"
services:
  db:
    image: postgres:17
    container_name: db
    environment:
      POSTGRES_DB: testdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: testpassword
    ports:
      - "7000:7000"
    command: -p 7000
    volumes:
      - ./pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -d testdb -p 7000"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: unless-stopped

  db-restore:
    image: postgres:17
    container_name: db-restore
    environment:
      POSTGRES_DB: testdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: testpassword
    ports:
      - "7001:7001"
    command: -p 7001
    volumes:
      - ./pgdata-restore:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -d testdb -p 7001"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: unless-stopped
    depends_on:
      - db
Далее настраивается файл postgresql.conf для максимального использования ресурсов. Представим оборудование: процессор AMD Ryzen 9 7950X (16 ядер, 32 потока), 64 ГБ оперативной памяти и NVMe-диск на 1 ТБ. Параметры оптимизируются через PgTune для 4 потоков и 16 ГБ памяти:
# DB Version: 17
# OS Type: linux
# DB Type: web
# Total Memory (RAM): 16 GB
# CPUs num: 4
# Connections num: 100
# Data Storage: ssd

max_connections = 100
shared_buffers = 4GB
effective_cache_size = 12GB
maintenance_work_mem = 1GB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 40329kB
huge_pages = off
min_wal_size = 1GB
max_wal_size = 4GB
max_worker_processes = 4
max_parallel_workers_per_gather = 2
max_parallel_workers = 4
max_parallel_maintenance_workers = 2

listen_addresses = '*'
Генерация тестовых данных
Создается тестовая база объемом около 11 ГБ, включающая 3 таблицы и 9 индексов. Данные формируются с учетом разнообразия, чтобы охватить разные сценарии. Универсальность проекта требует получения средних результатов.

Структура таблиц:
Индексы:
Данные генерируются с помощью Python-скрипта. Процесс:

  • Формируются пакеты по 25 000 записей.
  • Каждая таблица заполняется 100 000 строк через COPY.
  • После достижения 10 ГБ создаются индексы.
Пример скрипта:
#!/usr/bin/env python3
"""
Script to populate PostgreSQL database with ~10GB of test data for backup performance testing.
Inserts 1 million rows into each of 3 tables until reaching 10GB target.
"""

import psycopg2
import random
import string
import time
import io
from datetime import datetime, timedelta
import sys


# Database connection parameters
DB_CONFIG = {
    "host": "localhost",
    "port": 7000,
    "database": "testdb",
    "user": "postgres",
    "password": "testpassword",
}

# Target database size in bytes (10GB)
TARGET_SIZE_GB = 10
TARGET_SIZE_BYTES = TARGET_SIZE_GB * 1024 * 1024 * 1024

# Rows per table per round
ROWS_PER_TABLE = 100000


def generate_random_string(length):
    """Generate a random string of specified length."""
    return "".join(random.choices(string.ascii_letters + string.digits + " ", k=length))


def generate_random_date():
    """Generate a random date within the last 5 years."""
    start_date = datetime.now() - timedelta(days=5 * 365)
    random_days = random.randint(0, 5 * 365)
    return start_date + timedelta(days=random_days)


def create_tables(cursor):
    """Create the 3 test tables with various data types."""
    print("Creating 3 tables...")

    # Table 1: Large table with mixed data types
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS large_test_table (
            id BIGSERIAL PRIMARY KEY,
            name VARCHAR(100),
            description TEXT,
            email VARCHAR(255),
            phone VARCHAR(20),
            address TEXT,
            city VARCHAR(100),
            country VARCHAR(100),
            postal_code VARCHAR(20),
            birth_date DATE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            salary DECIMAL(10,2),
            is_active BOOLEAN DEFAULT TRUE,
            rating FLOAT,
            notes TEXT,
            department VARCHAR(100),
            employee_id VARCHAR(50)
        )
    """)

    # Table 2: Orders table for transactional data
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS orders (
            id BIGSERIAL PRIMARY KEY,
            user_id BIGINT,
            order_number VARCHAR(50) UNIQUE,
            total_amount DECIMAL(12,2),
            order_date TIMESTAMP,
            status VARCHAR(20),
            shipping_address TEXT,
            notes TEXT,
            payment_method VARCHAR(50),
            shipping_method VARCHAR(50),
            discount_amount DECIMAL(10,2),
            product_list TEXT,
            customer_notes TEXT
        )
    """)

    # Table 3: Activity logs with lots of text data
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS activity_logs (
            id BIGSERIAL PRIMARY KEY,
            user_id BIGINT,
            action VARCHAR(100),
            details TEXT,
            ip_address INET,
            user_agent TEXT,
            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            session_id VARCHAR(100),
            browser VARCHAR(50),
            operating_system VARCHAR(50),
            referrer TEXT,
            response_time INTEGER,
            error_message TEXT
        )
    """)

    print("3 tables created successfully!")


def create_indexes(cursor):
    """Create indexes for all 3 tables."""
    print("Creating indexes...")

    # Indexes for large_test_table
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_large_test_name ON large_test_table(name)"
    )
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_large_test_email ON large_test_table(email)"
    )
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_large_test_created_at ON large_test_table(created_at)"
    )
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_large_test_department ON large_test_table(department)"
    )

    # Indexes for orders
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_orders_user_id ON orders(user_id)")
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_orders_order_date ON orders(order_date)"
    )
    cursor.execute("CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status)")

    # Indexes for activity_logs
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_activity_user_id ON activity_logs(user_id)"
    )
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_activity_timestamp ON activity_logs(timestamp)"
    )
    cursor.execute(
        "CREATE INDEX IF NOT EXISTS idx_activity_action ON activity_logs(action)"
    )

    print("Indexes created successfully!")


def drop_indexes(cursor):
    """Drop indexes for faster bulk loading."""
    print("Dropping indexes for bulk loading...")

    indexes_to_drop = [
        "idx_large_test_name",
        "idx_large_test_email",
        "idx_large_test_created_at",
        "idx_large_test_department",
        "idx_orders_user_id",
        "idx_orders_order_date",
        "idx_orders_status",
        "idx_activity_user_id",
        "idx_activity_timestamp",
        "idx_activity_action",
    ]

    for index in indexes_to_drop:
        try:
            cursor.execute(f"DROP INDEX IF EXISTS {index}")
        except:
            pass

    print("Indexes dropped!")


def print_progress(inserted, total, start_time, operation_name):
    """Print detailed progress information."""
    current_time = time.time()
    elapsed = current_time - start_time
    rate = inserted / elapsed if elapsed > 0 else 0
    percentage = (inserted / total) * 100

    if rate > 0:
        eta_seconds = (total - inserted) / rate
        eta_minutes = eta_seconds / 60
        eta_str = f"ETA: {eta_minutes:.1f}m"
    else:
        eta_str = "ETA: calculating..."

    print(
        f"{operation_name}: {inserted:,} / {total:,} ({percentage:.1f}%) | "
        f"Rate: {rate:,.0f} rows/sec | Elapsed: {elapsed / 60:.1f}m | {eta_str}"
    )


def populate_large_table_batch(cursor, count=ROWS_PER_TABLE):
    """Populate large_test_table with specified number of rows."""
    print(f"Inserting {count:,} rows into large_test_table...")

    batch_size = 25000
    inserted = 0
    start_time = time.time()

    while inserted < count:
        current_batch_size = min(batch_size, count - inserted)
        data_buffer = io.StringIO()

        for i in range(current_batch_size):
            name = (
                generate_random_string(random.randint(20, 100))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            description = (
                generate_random_string(random.randint(100, 500))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            email = f"{generate_random_string(10)}@{generate_random_string(10)}.com"
            phone = f"+1-{random.randint(100, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}"
            address = (
                generate_random_string(random.randint(50, 200))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            city = (
                generate_random_string(random.randint(10, 50))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            country = (
                generate_random_string(random.randint(5, 30))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            postal_code = f"{random.randint(10000, 99999)}"
            birth_date = generate_random_date().date()
            salary = f"{random.randint(30000, 150000) + random.random():.2f}"
            is_active = "t" if random.choice([True, False]) else "f"
            rating = f"{random.uniform(1.0, 5.0):.2f}"
            notes = (
                generate_random_string(random.randint(50, 300))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            department = random.choice(
                ["HR", "Engineering", "Sales", "Marketing", "Finance", "Operations"]
            )
            employee_id = (
                f"EMP-{random.randint(1000, 9999)}-{generate_random_string(4)}"
            )

            data_buffer.write(
                f"{name}\t{description}\t{email}\t{phone}\t{address}\t{city}\t{country}\t{postal_code}\t{birth_date}\t{salary}\t{is_active}\t{rating}\t{notes}\t{department}\t{employee_id}\n"
            )

        data_buffer.seek(0)
        cursor.copy_from(
            data_buffer,
            "large_test_table",
            columns=(
                "name",
                "description",
                "email",
                "phone",
                "address",
                "city",
                "country",
                "postal_code",
                "birth_date",
                "salary",
                "is_active",
                "rating",
                "notes",
                "department",
                "employee_id",
            ),
            sep="\t",
        )

        inserted += current_batch_size
        if inserted % 100000 == 0:
            print_progress(inserted, count, start_time, "Large table")

    elapsed = time.time() - start_time
    print(
        f"Completed large_test_table: {inserted:,} rows in {elapsed:.2f}s ({inserted / elapsed:,.0f} rows/sec)"
    )


def populate_orders_batch(cursor, count=ROWS_PER_TABLE):
    """Populate orders table with specified number of rows."""
    print(f"Inserting {count:,} rows into orders...")

    batch_size = 25000
    inserted = 0
    start_time = time.time()

    while inserted < count:
        current_batch_size = min(batch_size, count - inserted)
        data_buffer = io.StringIO()

        for i in range(current_batch_size):
            user_id = random.randint(1, 10000000)
            order_number = f"ORD-{time.time_ns()}-{i:06d}"
            total_amount = f"{random.uniform(10.0, 5000.0):.2f}"
            order_date = generate_random_date()
            status = random.choice(
                ["pending", "processing", "shipped", "delivered", "cancelled"]
            )
            shipping_address = (
                generate_random_string(random.randint(100, 300))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            notes = (
                generate_random_string(random.randint(50, 200))
                .replace("\t", " ")
                .replace("\n", " ")
                if random.random() > 0.5
                else ""
            )
            payment_method = random.choice(["credit_card", "paypal", "bank_transfer"])
            shipping_method = random.choice(["standard", "express", "overnight"])
            discount_amount = (
                f"{random.uniform(0, 50) if random.random() > 0.7 else 0:.2f}"
            )
            product_list = (
                generate_random_string(random.randint(200, 800))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            customer_notes = (
                generate_random_string(random.randint(100, 400))
                .replace("\t", " ")
                .replace("\n", " ")
            )

            data_buffer.write(
                f"{user_id}\t{order_number}\t{total_amount}\t{order_date}\t{status}\t{shipping_address}\t{notes}\t{payment_method}\t{shipping_method}\t{discount_amount}\t{product_list}\t{customer_notes}\n"
            )

        data_buffer.seek(0)
        cursor.copy_from(
            data_buffer,
            "orders",
            columns=(
                "user_id",
                "order_number",
                "total_amount",
                "order_date",
                "status",
                "shipping_address",
                "notes",
                "payment_method",
                "shipping_method",
                "discount_amount",
                "product_list",
                "customer_notes",
            ),
            sep="\t",
        )

        inserted += current_batch_size
        if inserted % 100000 == 0:
            print_progress(inserted, count, start_time, "Orders")

    elapsed = time.time() - start_time
    print(
        f"Completed orders: {inserted:,} rows in {elapsed:.2f}s ({inserted / elapsed:,.0f} rows/sec)"
    )


def populate_activity_logs_batch(cursor, count=ROWS_PER_TABLE):
    """Populate activity_logs table with specified number of rows."""
    print(f"Inserting {count:,} rows into activity_logs...")

    batch_size = 25000
    inserted = 0
    start_time = time.time()
    actions = [
        "login",
        "logout",
        "view_product",
        "add_to_cart",
        "checkout",
        "update_profile",
        "search",
        "download",
    ]

    while inserted < count:
        current_batch_size = min(batch_size, count - inserted)
        data_buffer = io.StringIO()

        for i in range(current_batch_size):
            user_id = random.randint(1, 10000000)
            action = random.choice(actions)
            details = (
                generate_random_string(random.randint(100, 500))
                .replace("\t", " ")
                .replace("\n", " ")
            )
            ip_address = f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"
            user_agent = f"Mozilla/5.0 ({generate_random_string(50)}) {generate_random_string(30)}".replace(
                "\t", " "
            ).replace("\n", " ")
            timestamp = generate_random_date()
            session_id = generate_random_string(32)
            browser = random.choice(["Chrome", "Firefox", "Safari", "Edge"])
            operating_system = random.choice(
                ["Windows", "macOS", "Linux", "iOS", "Android"]
            )
            referrer = (
                f"https://{generate_random_string(10)}.com"
                if random.random() > 0.3
                else ""
            )
            response_time = random.randint(50, 5000)
            error_message = (
                generate_random_string(random.randint(100, 300))
                .replace("\t", " ")
                .replace("\n", " ")
                if random.random() > 0.8
                else ""
            )

            data_buffer.write(
                f"{user_id}\t{action}\t{details}\t{ip_address}\t{user_agent}\t{timestamp}\t{session_id}\t{browser}\t{operating_system}\t{referrer}\t{response_time}\t{error_message}\n"
            )

        data_buffer.seek(0)
        cursor.copy_from(
            data_buffer,
            "activity_logs",
            columns=(
                "user_id",
                "action",
                "details",
                "ip_address",
                "user_agent",
                "timestamp",
                "session_id",
                "browser",
                "operating_system",
                "referrer",
                "response_time",
                "error_message",
            ),
            sep="\t",
        )

        inserted += current_batch_size
        if inserted % 100000 == 0:
            print_progress(inserted, count, start_time, "Activity logs")

    elapsed = time.time() - start_time
    print(
        f"Completed activity_logs: {inserted:,} rows in {elapsed:.2f}s ({inserted / elapsed:,.0f} rows/sec)"
    )


def get_database_size_bytes(cursor):
    """Get the current database size in bytes."""
    cursor.execute("SELECT pg_database_size('testdb')")
    return cursor.fetchone()[0]


def get_database_size_mb(cursor):
    """Get the current database size in MB."""
    size_bytes = get_database_size_bytes(cursor)
    return size_bytes / (1024 * 1024)


def format_size_mb(size_bytes):
    """Format size in bytes to MB string."""
    return f"{size_bytes / (1024 * 1024):.1f}MB"


def main():
    """Main function to populate the database until reaching 10GB."""
    print("Starting database population for backup performance testing...")
    print(f"Target: {TARGET_SIZE_GB * 1024:.0f}MB of data")
    print(
        f"Strategy: Insert {ROWS_PER_TABLE:,} rows into each of 3 tables per round until target reached"
    )
    print("-" * 80)

    try:
        # Connect to database
        print("Connecting to database...")
        conn = psycopg2.connect(**DB_CONFIG)
        conn.autocommit = False
        cursor = conn.cursor()

        initial_size_mb = get_database_size_mb(cursor)
        print(f"Initial database size: {initial_size_mb:.1f}MB")
        overall_start_time = time.time()

        # Create tables
        create_tables(cursor)
        conn.commit()

        # Drop indexes for faster loading
        drop_indexes(cursor)
        conn.commit()

        round_number = 1
        current_size_bytes = get_database_size_bytes(cursor)

        # Keep adding rounds of 1M rows per table until we reach 10GB
        while current_size_bytes < TARGET_SIZE_BYTES:
            print(f"\n{'=' * 20} ROUND {round_number} {'=' * 20}")
            round_start_time = time.time()

            # Populate all 3 tables with 1M rows each
            populate_large_table_batch(cursor)
            conn.commit()

            populate_orders_batch(cursor)
            conn.commit()

            populate_activity_logs_batch(cursor)
            conn.commit()

            # Check current size
            current_size_bytes = get_database_size_bytes(cursor)
            current_size_mb = format_size_mb(current_size_bytes)
            round_elapsed = time.time() - round_start_time

            print(f"\nRound {round_number} completed in {round_elapsed:.2f}s")
            print(f"Current database size: {current_size_mb}")
            print(
                f"Progress: {(current_size_bytes / TARGET_SIZE_BYTES) * 100:.1f}% of target ({TARGET_SIZE_GB * 1024:.0f}MB)"
            )

            if current_size_bytes >= TARGET_SIZE_BYTES:
                print(f"✅ Target size reached!")
                break

            round_number += 1

        # Recreate indexes
        print(f"\n{'=' * 20} CREATING INDEXES {'=' * 20}")
        index_start_time = time.time()
        create_indexes(cursor)
        conn.commit()
        index_elapsed = time.time() - index_start_time
        print(f"Indexes created in {index_elapsed:.2f}s")

        # Final statistics
        overall_elapsed = time.time() - overall_start_time
        final_size_mb = get_database_size_mb(cursor)

        print(f"\n{'=' * 60}")
        print("DATABASE POPULATION COMPLETED!")
        print(f"Final database size: {final_size_mb:.1f}MB")
        print(f"Total rounds: {round_number}")
        print(
            f"Total time: {overall_elapsed:.2f} seconds ({overall_elapsed / 60:.2f} minutes)"
        )

        # Show table statistics
        cursor.execute("""
            SELECT 
                relname,
                n_tup_ins as "Rows",
                round(pg_total_relation_size('public.'||relname)::numeric / (1024*1024), 1) as "Size_MB"
            FROM pg_stat_user_tables 
            ORDER BY pg_total_relation_size('public.'||relname) DESC
        """)

        print(f"\nTable Statistics:")
        print(f"{'Table':<20} | {'Rows':<12} | {'Size':<10}")
        print("-" * 50)
        for row in cursor.fetchall():
            print(f"{row[0]:<20} | {row[1]:>12,} | {row[2]:.1f}MB")

        cursor.close()
        conn.close()

    except psycopg2.Error as e:
        print(f"Database error: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Error: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()
Анализ результатов
После 21 цикла создания и восстановления собираются данные: время создания копии, время восстановления, общее время и размер файла относительно исходной базы.

CSV таблица с сырыми данными:
tool,format,compression_method,compression_level,backup_duration_seconds,restore_duration_seconds,total_duration_seconds,backup_size_bytes,database_size_bytes,restored_db_size_bytes,compression_ratio,backup_success,restore_success,backup_error,restore_error,timestamp
pg_dump,plain,none,0,100.39210295677185,735.2188968658447,835.6109998226166,9792231003,11946069139,11922173075,0.8197031918249641,True,True,,,2025-07-29T09:56:20.611844
pg_dump,custom,none,0,264.56927490234375,406.6467957496643,671.216070652008,6862699613,11946069139,11943709843,0.5744734550878778,True,True,,,2025-07-29T10:07:37.226681
pg_dump,custom,gzip,1,214.07211470603943,383.0168492794037,597.0889639854431,7074031563,11946069139,11943611539,0.5921639562511493,True,True,,,2025-07-29T10:17:39.801883
pg_dump,custom,gzip,5,260.6179132461548,393.76623010635376,654.3841433525085,6866440205,11946069139,11943718035,0.5747865783384196,True,True,,,2025-07-29T10:28:40.167485
pg_dump,custom,gzip,9,272.3802499771118,385.1409020423889,657.5211520195007,6856264586,11946069139,11943619731,0.5739347819121977,True,True,,,2025-07-29T10:39:42.912960
pg_dump,custom,lz4,1,84.0079517364502,379.6986663341522,463.7066180706024,9146843234,11946069139,11943685267,0.765678075990583,True,True,,,2025-07-29T10:47:32.131593
pg_dump,custom,lz4,5,150.24981474876404,393.44346714019775,543.6932818889618,8926348325,11946069139,11943718035,0.7472205477078983,True,True,,,2025-07-29T10:56:41.333595
pg_dump,custom,lz4,12,220.93980932235718,418.26913809776306,639.2089474201202,8923243046,11946069139,11943767187,0.7469606062188722,True,True,,,2025-07-29T11:07:26.574678
pg_dump,custom,zstd,1,87.83108067512512,419.07846903800964,506.90954971313477,6835388679,11946069139,11943767187,0.5721872692570225,True,True,,,2025-07-29T11:15:59.917828
pg_dump,custom,zstd,5,102.42366409301758,413.64263129234314,516.0662953853607,6774137561,11946069139,11944357011,0.567059966100871,True,True,,,2025-07-29T11:24:42.075008
pg_dump,custom,zstd,15,844.7868592739105,388.23959374427795,1233.0264530181885,6726189591,11946069139,11943636115,0.5630462633973209,True,True,,,2025-07-29T11:45:17.885163
pg_dump,custom,zstd,22,5545.566084384918,404.1370210647583,5949.7031054496765,6798947241,11946069139,11943750803,0.5691367731000038,True,True,,,2025-07-29T13:24:30.014902
pg_dump,directory,none,0,114.9900906085968,395.2716040611267,510.2616946697235,6854332396,11946069139,11943693459,0.5737730391684116,True,True,,,2025-07-29T13:33:05.944191
pg_dump,directory,lz4,1,53.48561334609985,384.92091369628906,438.4065270423889,9146095976,11946069139,11943668883,0.7656155233641663,True,True,,,2025-07-29T13:40:30.590719
pg_dump,directory,lz4,5,83.44352841377258,410.42058181762695,493.86411023139954,8925601067,11946069139,11943718035,0.7471579950814815,True,True,,,2025-07-29T13:48:50.201990
pg_dump,directory,lz4,12,114.15110802650452,400.04946303367615,514.2005710601807,8922495788,11946069139,11943758995,0.7468980535924554,True,True,,,2025-07-29T13:57:30.419171
pg_dump,directory,zstd,1,57.22735643386841,414.4600088596344,471.6873652935028,6835014976,11946069139,11943750803,0.5721559867493079,True,True,,,2025-07-29T14:05:28.529630
pg_dump,directory,zstd,5,60.121564865112305,398.27933716773987,458.4009020328522,6773763858,11946069139,11943709843,0.5670286835931563,True,True,,,2025-07-29T14:13:13.472761
pg_dump,directory,zstd,15,372.43965554237366,382.9877893924713,755.427444934845,6725815888,11946069139,11943644307,0.5630149808896062,True,True,,,2025-07-29T14:25:54.580924
pg_dump,directory,zstd,22,2637.47145485878,394.4939453601837,3031.9654002189636,6798573538,11946069139,11943660691,0.5691054905922891,True,True,,,2025-07-29T15:16:29.450828
pg_dump,tar,none,0,126.3212628364563,664.1294028759003,790.4506657123566,9792246784,11946069139,11942759571,0.8197045128452776,True,True,,,2025-07-29T15:29:45.280592
Скрипт замера:
#!/usr/bin/env python3
"""
Comprehensive PostgreSQL backup performance testing script.
Tests pg_dump with all possible formats and compression levels.
"""

import subprocess
import time
import os
import shutil
import json
import csv
from datetime import datetime
from pathlib import Path
import psycopg2
import argparse


def log_with_timestamp(message):
    """Print a message with timestamp."""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")


# PostgreSQL binary path
PG_BIN_PATH = Path("./postgresql-17/bin")

# Database connection parameters
DB_CONFIG = {
    "host": "localhost",
    "port": 7000,
    "database": "testdb",
    "user": "postgres",
    "password": "testpassword",
}

# Restore database connection parameters
RESTORE_DB_CONFIG = {
    "host": "localhost",
    "port": 7001,
    "database": "testdb",
    "user": "postgres",
    "password": "testpassword",
}

# Test configurations
PG_DUMP_FORMATS = [
    ("plain", "sql"),
    ("custom", "dump"),
    ("directory", "dir"),
    ("tar", "tar"),
]

COMPRESSION_LEVELS = {
    "gzip": [1, 5, 9],
    "lz4": [1, 5, 12],
    "zstd": [1, 5, 15, 22],
}

# Available compression methods (depends on PostgreSQL version)
COMPRESSION_METHODS = ["none", "gzip", "lz4", "zstd"]

# Results storage
results: list[dict] = []


def ensure_backup_directory():
    """Ensure backup directory exists and is clean."""
    backup_dir = Path("./backups")
    if backup_dir.exists():
        shutil.rmtree(backup_dir)
    backup_dir.mkdir(exist_ok=True)
    return backup_dir


def get_database_size():
    """Get current database size in bytes."""
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()
        cursor.execute("SELECT pg_database_size('testdb')")
        size = cursor.fetchone()[0]
        cursor.close()
        conn.close()
        return size
    except Exception as e:
        log_with_timestamp(f"Error getting database size: {e}")
        return 0


def get_restore_database_size():
    """Get restore database size in bytes."""
    try:
        conn = psycopg2.connect(**RESTORE_DB_CONFIG)
        cursor = conn.cursor()
        cursor.execute("SELECT pg_database_size('testdb')")
        size = cursor.fetchone()[0]
        cursor.close()
        conn.close()
        return size
    except Exception as e:
        log_with_timestamp(f"Error getting restore database size: {e}")
        return 0


def clean_restore_database():
    """Clean the restore database by dropping and recreating it."""
    import time

    try:
        # Connect to postgres database to drop/create testdb
        restore_config = RESTORE_DB_CONFIG.copy()
        restore_config["database"] = "postgres"

        conn = psycopg2.connect(**restore_config)
        conn.autocommit = True
        cursor = conn.cursor()

        # Terminate any existing connections to testdb with retry logic
        max_attempts = 5
        for attempt in range(max_attempts):
            try:
                # Get count of active connections first
                cursor.execute("""
                    SELECT COUNT(*)
                    FROM pg_stat_activity
                    WHERE datname = 'testdb' AND pid <> pg_backend_pid()
                """)
                active_connections = cursor.fetchone()[0]

                if active_connections == 0:
                    break

                log_with_timestamp(
                    f"Found {active_connections} active connections to testdb, terminating..."
                )

                # Terminate connections
                cursor.execute("""
                    SELECT pg_terminate_backend(pid)
                    FROM pg_stat_activity
                    WHERE datname = 'testdb' AND pid <> pg_backend_pid()
                """)

                # Fetch all results to ensure the query completes
                terminated_pids = cursor.fetchall()
                log_with_timestamp(f"Terminated {len(terminated_pids)} connections")

                # Wait a bit for connections to actually close
                time.sleep(2)

            except Exception as term_error:
                log_with_timestamp(
                    f"Warning: Error terminating connections (attempt {attempt + 1}): {term_error}"
                )
                if attempt < max_attempts - 1:
                    time.sleep(1)
                else:
                    # Continue anyway, maybe we can still drop the database
                    break

        # Try to drop database with retry logic
        drop_attempts = 3
        for attempt in range(drop_attempts):
            try:
                cursor.execute("DROP DATABASE IF EXISTS testdb")
                log_with_timestamp("Database testdb dropped successfully")
                break
            except Exception as drop_error:
                if (
                    "is being accessed by other users" in str(drop_error)
                    and attempt < drop_attempts - 1
                ):
                    log_with_timestamp(
                        f"Database still in use, waiting and retrying... (attempt {attempt + 1})"
                    )
                    time.sleep(3)
                else:
                    raise drop_error

        # Create fresh database
        cursor.execute("CREATE DATABASE testdb")

        cursor.close()
        conn.close()

        log_with_timestamp("✓ Restore database cleaned and recreated")
        return True

    except Exception as e:
        log_with_timestamp(f"✗ Error cleaning restore database: {e}")
        return False


def wait_for_restore_db():
    """Wait for restore database to be ready."""
    import time

    max_attempts = 30
    for attempt in range(max_attempts):
        try:
            conn = psycopg2.connect(**RESTORE_DB_CONFIG)
            conn.close()
            return True
        except:
            if attempt < max_attempts - 1:
                time.sleep(1)
            else:
                log_with_timestamp("✗ Restore database not ready after 30 seconds")
                return False


def restore_pg_dump(backup_path, format_name):
    """Restore a pg_dump backup."""
    env = os.environ.copy()
    env["PGPASSWORD"] = RESTORE_DB_CONFIG["password"]

    if format_name == "plain":
        # Use psql for plain format
        command = [
            str(PG_BIN_PATH / "psql.exe"),
            "-h",
            RESTORE_DB_CONFIG["host"],
            "-p",
            str(RESTORE_DB_CONFIG["port"]),
            "-U",
            RESTORE_DB_CONFIG["user"],
            "-d",
            RESTORE_DB_CONFIG["database"],
            "-f",
            str(backup_path),
            "-v",
            "ON_ERROR_STOP=1",
        ]
    else:
        # Use pg_restore for custom, directory, tar formats
        command = [
            str(PG_BIN_PATH / "pg_restore.exe"),
            "-h",
            RESTORE_DB_CONFIG["host"],
            "-p",
            str(RESTORE_DB_CONFIG["port"]),
            "-U",
            RESTORE_DB_CONFIG["user"],
            "-d",
            RESTORE_DB_CONFIG["database"],
            "--verbose",
            str(backup_path),
        ]

        # Add parallel processing for custom and directory formats
        if format_name in ["custom", "directory"]:
            command.extend(["-j", "4"])
        else:
            # Only add --single-transaction for tar format (not parallel)
            command.insert(-1, "--single-transaction")

    return run_command(command, env=env)


def get_file_size(filepath):
    """Get file or directory size in bytes."""
    if os.path.isfile(filepath):
        return os.path.getsize(filepath)
    elif os.path.isdir(filepath):
        total_size = 0
        for dirpath, dirnames, filenames in os.walk(filepath):
            for filename in filenames:
                file_path = os.path.join(dirpath, filename)
                total_size += os.path.getsize(file_path)
        return total_size
    return 0


def format_size(size_bytes):
    """Format size in GB with 1 decimal place."""
    size_gb = size_bytes / (1024.0 * 1024.0 * 1024.0)
    return f"{size_gb:.1f} GB"


def format_minutes(seconds):
    """Format time in minutes with 1 decimal place."""
    minutes = seconds / 60.0
    return f"{minutes:.1f} mins"


def run_command(command, timeout=7200, env=None):
    """Run a command and measure execution time."""
    log_with_timestamp(f"Running: {' '.join(command)}")
    start_time = time.time()

    try:
        # Set environment variables for password
        if env is None:
            env = os.environ.copy()
            env["PGPASSWORD"] = DB_CONFIG["password"]

        result = subprocess.run(
            command, capture_output=True, text=True, timeout=timeout, env=env
        )

        end_time = time.time()
        duration = end_time - start_time

        if result.returncode != 0:
            log_with_timestamp(f"Command failed with return code {result.returncode}")
            log_with_timestamp(f"STDERR: {result.stderr}")
            return None, duration, result.stderr

        return result, duration, None

    except subprocess.TimeoutExpired:
        log_with_timestamp(f"Command timed out after {timeout} seconds")
        return None, timeout, "Command timed out"
    except Exception as e:
        end_time = time.time()
        duration = end_time - start_time
        log_with_timestamp(f"Command failed with exception: {e}")
        return None, duration, str(e)


def test_pg_dump(skip_restore=False):
    """Test pg_dump with all format and compression combinations."""
    log_with_timestamp("\n" + "=" * 60)
    log_with_timestamp("TESTING PG_DUMP")
    log_with_timestamp("=" * 60)

    backup_dir = Path("./backups")

    for format_name, extension in PG_DUMP_FORMATS:
        log_with_timestamp(f"\nTesting pg_dump format: {format_name}")

        # Test without compression
        test_name = f"pg_dump_{format_name}_no_compression"
        output_path = backup_dir / f"{test_name}.{extension}"

        command = [
            str(PG_BIN_PATH / "pg_dump.exe"),
            "-h",
            DB_CONFIG["host"],
            "-p",
            str(DB_CONFIG["port"]),
            "-U",
            DB_CONFIG["user"],
            "-d",
            DB_CONFIG["database"],
            "-f",
            str(output_path),
            "--format",
            format_name,
            "--verbose",
        ]

        if format_name == "directory":
            # For directory format, create the directory first
            output_path.mkdir(exist_ok=True)
            # Replace the path after -f flag with the directory path
            for i, item in enumerate(command):
                if item == "-f" and i + 1 < len(command):
                    command[i + 1] = str(output_path)
                    break
            # Add parallel processing for directory format
            command.extend(["-j", "4"])

        # Perform backup
        result, backup_duration, error = run_command(command)

        if result is not None:
            backup_size = get_file_size(output_path)
            db_size = get_database_size()
            compression_ratio = backup_size / db_size if db_size > 0 else 0

            # Clean restore database and perform restore
            restore_success = False
            restore_duration = 0
            restore_error = None
            restored_db_size = 0

            if not skip_restore:
                if clean_restore_database() and wait_for_restore_db():
                    restore_result, restore_duration, restore_error = restore_pg_dump(
                        output_path, format_name
                    )
                    if restore_result is not None and restore_result.returncode == 0:
                        restore_success = True
                        restored_db_size = get_restore_database_size()
            else:
                restore_success = True  # Mark as successful if skipped
                restore_error = "Skipped"

            results.append(
                {
                    "tool": "pg_dump",
                    "format": format_name,
                    "compression_method": "none",
                    "compression_level": 0,
                    "backup_duration_seconds": backup_duration,
                    "restore_duration_seconds": restore_duration,
                    "total_duration_seconds": backup_duration + restore_duration,
                    "backup_size_bytes": backup_size,
                    "database_size_bytes": db_size,
                    "restored_db_size_bytes": restored_db_size,
                    "compression_ratio": compression_ratio,
                    "backup_success": True,
                    "restore_success": restore_success,
                    "backup_error": None,
                    "restore_error": restore_error,
                    "timestamp": datetime.now().isoformat(),
                }
            )

            if skip_restore:
                log_with_timestamp(
                    f"✓ {test_name}: Backup {format_minutes(backup_duration)}, "
                    f"{format_size(backup_size)}, ratio: {compression_ratio:.3f} (restore skipped)"
                )
            else:
                log_with_timestamp(
                    f"✓ {test_name}: Backup {format_minutes(backup_duration)}, Restore {format_minutes(restore_duration)}, "
                    f"{format_size(backup_size)}, ratio: {compression_ratio:.3f}"
                )
        else:
            results.append(
                {
                    "tool": "pg_dump",
                    "format": format_name,
                    "compression_method": "none",
                    "compression_level": 0,
                    "backup_duration_seconds": backup_duration,
                    "restore_duration_seconds": 0,
                    "total_duration_seconds": backup_duration,
                    "backup_size_bytes": 0,
                    "database_size_bytes": get_database_size(),
                    "restored_db_size_bytes": 0,
                    "compression_ratio": 0,
                    "backup_success": False,
                    "restore_success": False,
                    "backup_error": error,
                    "restore_error": "Backup failed",
                    "timestamp": datetime.now().isoformat(),
                }
            )
            log_with_timestamp(f"✗ {test_name}: BACKUP FAILED - {error}")

        # Test with compression (only for formats that support it)
        if format_name in ["custom", "directory"]:
            for compression_method in COMPRESSION_METHODS[1:]:  # Skip 'none'
                if compression_method == "gzip" and format_name == "directory":
                    continue  # Directory format doesn't support gzip compression directly

                compression_levels = COMPRESSION_LEVELS.get(compression_method, [1])

                for level in compression_levels:
                    test_name = (
                        f"pg_dump_{format_name}_{compression_method}_level_{level}"
                    )
                    output_path = backup_dir / f"{test_name}.{extension}"

                    command = [
                        str(PG_BIN_PATH / "pg_dump.exe"),
                        "-h",
                        DB_CONFIG["host"],
                        "-p",
                        str(DB_CONFIG["port"]),
                        "-U",
                        DB_CONFIG["user"],
                        "-d",
                        DB_CONFIG["database"],
                        "-f",
                        str(output_path),
                        "--format",
                        format_name,
                        "--verbose",
                    ]

                    # Add compression options
                    if format_name == "custom":
                        if compression_method == "gzip":
                            command.extend(["-Z", str(level)])
                        else:
                            command.extend(
                                [
                                    "--compress",
                                    f"{compression_method}:{level}",
                                ]
                            )
                    elif format_name == "directory":
                        output_path.mkdir(exist_ok=True)
                        # For directory format, replace the path after -f
                        for i, item in enumerate(command):
                            if item == "-f" and i + 1 < len(command):
                                command[i + 1] = str(output_path)
                                break
                        if (
                            compression_method != "gzip"
                        ):  # Directory format supports lz4 and zstd
                            command.extend(
                                [
                                    "--compress",
                                    f"{compression_method}:{level}",
                                ]
                            )
                        # Add parallel processing for directory format
                        command.extend(["-j", "4"])

                    # Perform backup
                    result, backup_duration, error = run_command(command)

                    if result is not None:
                        backup_size = get_file_size(output_path)
                        db_size = get_database_size()
                        compression_ratio = backup_size / db_size if db_size > 0 else 0

                        # Clean restore database and perform restore
                        restore_success = False
                        restore_duration = 0
                        restore_error = None
                        restored_db_size = 0

                        if not skip_restore:
                            if clean_restore_database() and wait_for_restore_db():
                                restore_result, restore_duration, restore_error = (
                                    restore_pg_dump(output_path, format_name)
                                )
                                if (
                                    restore_result is not None
                                    and restore_result.returncode == 0
                                ):
                                    restore_success = True
                                    restored_db_size = get_restore_database_size()
                        else:
                            restore_success = True  # Mark as successful if skipped
                            restore_error = "Skipped"

                        results.append(
                            {
                                "tool": "pg_dump",
                                "format": format_name,
                                "compression_method": compression_method,
                                "compression_level": level,
                                "backup_duration_seconds": backup_duration,
                                "restore_duration_seconds": restore_duration,
                                "total_duration_seconds": backup_duration
                                + restore_duration,
                                "backup_size_bytes": backup_size,
                                "database_size_bytes": db_size,
                                "restored_db_size_bytes": restored_db_size,
                                "compression_ratio": compression_ratio,
                                "backup_success": True,
                                "restore_success": restore_success,
                                "backup_error": None,
                                "restore_error": restore_error,
                                "timestamp": datetime.now().isoformat(),
                            }
                        )

                        if skip_restore:
                            log_with_timestamp(
                                f"✓ {test_name}: Backup {format_minutes(backup_duration)}, "
                                f"{format_size(backup_size)}, ratio: {compression_ratio:.3f} (restore skipped)"
                            )
                        else:
                            log_with_timestamp(
                                f"✓ {test_name}: Backup {format_minutes(backup_duration)}, Restore {format_minutes(restore_duration)}, "
                                f"{format_size(backup_size)}, ratio: {compression_ratio:.3f}"
                            )
                    else:
                        results.append(
                            {
                                "tool": "pg_dump",
                                "format": format_name,
                                "compression_method": compression_method,
                                "compression_level": level,
                                "backup_duration_seconds": backup_duration,
                                "restore_duration_seconds": 0,
                                "total_duration_seconds": backup_duration,
                                "backup_size_bytes": 0,
                                "database_size_bytes": get_database_size(),
                                "restored_db_size_bytes": 0,
                                "compression_ratio": 0,
                                "backup_success": False,
                                "restore_success": False,
                                "backup_error": error,
                                "restore_error": "Backup failed",
                                "timestamp": datetime.now().isoformat(),
                            }
                        )
                        log_with_timestamp(f"✗ {test_name}: BACKUP FAILED - {error}")


def save_tabular_results():
    """Save tabular test results to a CSV file."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Save as tabular CSV
    csv_file = f"backup_performance_tabular_{timestamp}.csv"

    with open(csv_file, "w", newline="") as f:
        writer = csv.writer(f)

        # Write header
        writer.writerow(
            [
                "format",
                "backup duration mins",
                "restore duration mins",
                "total duration mins",
                "backup size GB",
                "db size GB",
                "restored DB size GB",
                "db size % from original",
            ]
        )

        # Write data rows
        for r in sorted(
            results,
            key=lambda x: (
                x["format"],
                x["compression_method"],
                x["compression_level"],
            ),
        ):
            if not r["backup_success"]:
                continue

            format_str = f"{r['format']} {r['compression_method']}"
            if r["compression_method"] != "none":
                format_str += f" {r['compression_level']}"

            backup_mins = round(r["backup_duration_seconds"] / 60.0, 1)
            restore_mins = round(r["restore_duration_seconds"] / 60.0, 1)
            total_mins = round(backup_mins + restore_mins, 1)

            backup_gb = round(r["backup_size_bytes"] / (1024.0 * 1024.0 * 1024.0), 1)
            db_gb = round(r["database_size_bytes"] / (1024.0 * 1024.0 * 1024.0), 1)
            restored_db_gb = round(
                r["restored_db_size_bytes"] / (1024.0 * 1024.0 * 1024.0), 1
            )

            db_size_percent = round((backup_gb / db_gb) * 100) if db_gb > 0 else 0

            writer.writerow(
                [
                    format_str,
                    backup_mins,
                    restore_mins,
                    total_mins,
                    backup_gb,
                    db_gb,
                    restored_db_gb,
                    db_size_percent,
                ]
            )

    print(f"Tabular results saved to {csv_file}")


def save_results():
    """Save test results to JSON and CSV files."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Save as JSON
    json_file = f"backup_performance_results_{timestamp}.json"
    with open(json_file, "w") as f:
        json.dump(results, f, indent=2)
    print(f"\nResults saved to {json_file}")

    # Save as CSV
    csv_file = f"backup_performance_results_{timestamp}.csv"
    if results:
        fieldnames = results[0].keys()
        with open(csv_file, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(results)
        print(f"Results saved to {csv_file}")

    # Save tabular results
    save_tabular_results()


def print_summary():
    """Print a summary of test results."""
    print("\n" + "=" * 80)
    print("PERFORMANCE TEST SUMMARY")
    print("=" * 80)

    if not results:
        print("No results to display.")
        return

    # Check if any tests had restore attempts
    restore_attempted = any(
        r["restore_error"] != "Skipped" for r in results if r["backup_success"]
    )

    if restore_attempted:
        successful_tests = [
            r for r in results if r["backup_success"] and r["restore_success"]
        ]
        backup_only_success = [
            r
            for r in results
            if r["backup_success"]
            and not r["restore_success"]
            and r["restore_error"] != "Skipped"
        ]
        failed_tests = [r for r in results if not r["backup_success"]]

        print(f"Total tests: {len(results)}")
        print(f"Complete success (backup + restore): {len(successful_tests)}")
        print(f"Backup only success: {len(backup_only_success)}")
        print(f"Complete failures: {len(failed_tests)}")

        if successful_tests:
            print(f"\nBest compression ratios (complete success):")
            best_compression = sorted(
                successful_tests, key=lambda x: x["compression_ratio"]
            )[:5]
            for test in best_compression:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Ratio: {test['compression_ratio']:.3f}, Backup: {format_minutes(test['backup_duration_seconds'])}, "
                    f"Restore: {format_minutes(test['restore_duration_seconds'])}, Total: {format_minutes(test['total_duration_seconds'])}"
                )

            print(f"\nFastest total time (backup + restore):")
            fastest_total = sorted(
                successful_tests, key=lambda x: x["total_duration_seconds"]
            )[:5]
            for test in fastest_total:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Total: {format_minutes(test['total_duration_seconds'])}, Ratio: {test['compression_ratio']:.3f}"
                )

            print(f"\nFastest backup times:")
            fastest_backup = sorted(
                successful_tests, key=lambda x: x["backup_duration_seconds"]
            )[:5]
            for test in fastest_backup:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Backup: {format_minutes(test['backup_duration_seconds'])}, Restore: {format_minutes(test['restore_duration_seconds'])}"
                )

            print(f"\nFastest restore times:")
            fastest_restore = sorted(
                successful_tests, key=lambda x: x["restore_duration_seconds"]
            )[:5]
            for test in fastest_restore:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Restore: {format_minutes(test['restore_duration_seconds'])}, Backup: {format_minutes(test['backup_duration_seconds'])}"
                )

        if backup_only_success:
            print(f"\nBackup-only successes (restore failed):")
            for test in backup_only_success:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Backup: {format_minutes(test['backup_duration_seconds'])}, Restore Error: {test['restore_error']}"
                )

        if failed_tests:
            print(f"\nComplete failures:")
            for test in failed_tests:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Backup Error: {test['backup_error']}"
                )
    else:
        # Restore was skipped for all tests
        successful_tests = [r for r in results if r["backup_success"]]
        failed_tests = [r for r in results if not r["backup_success"]]

        print(f"Total tests: {len(results)}")
        print(f"Successful backups: {len(successful_tests)}")
        print(f"Failed backups: {len(failed_tests)}")
        print("Note: Restore tests were skipped")

        if successful_tests:
            print(f"\nBest compression ratios:")
            best_compression = sorted(
                successful_tests, key=lambda x: x["compression_ratio"]
            )[:5]
            for test in best_compression:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Ratio: {test['compression_ratio']:.3f}, Backup: {format_minutes(test['backup_duration_seconds'])}"
                )

            print(f"\nFastest backup times:")
            fastest_backup = sorted(
                successful_tests, key=lambda x: x["backup_duration_seconds"]
            )[:5]
            for test in fastest_backup:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Backup: {format_minutes(test['backup_duration_seconds'])}, Ratio: {test['compression_ratio']:.3f}"
                )

        if failed_tests:
            print(f"\nFailed backups:")
            for test in failed_tests:
                print(
                    f"  {test['tool']} {test['format']} {test['compression_method']}:{test['compression_level']} - "
                    f"Backup Error: {test['backup_error']}"
                )


def print_tabular_report():
    """Print a tabular report of results similar to the requested format."""
    if not results:
        print("No results to display.")
        return

    print("\n" + "=" * 120)
    print("TABULAR PERFORMANCE REPORT")
    print("=" * 120)

    # Print header
    print(
        f"{'format':<20}{'backup duration mins':<20}{'restore duration mins':<25}{'total duration mins':<20}"
        f"{'backup size GB':<15}{'db size GB':<15}{'restored DB size GB':<20}{'db size % from original':<25}"
    )
    print("-" * 120)

    # Group results by format and compression method+level
    for r in sorted(
        results,
        key=lambda x: (x["format"], x["compression_method"], x["compression_level"]),
    ):
        if not r["backup_success"]:
            continue

        format_str = f"{r['format']} {r['compression_method']}"
        if r["compression_method"] != "none":
            format_str += f" {r['compression_level']}"

        backup_mins = r["backup_duration_seconds"] / 60.0
        restore_mins = r["restore_duration_seconds"] / 60.0
        total_mins = backup_mins + restore_mins

        backup_gb = r["backup_size_bytes"] / (1024.0 * 1024.0 * 1024.0)
        db_gb = r["database_size_bytes"] / (1024.0 * 1024.0 * 1024.0)
        restored_db_gb = r["restored_db_size_bytes"] / (1024.0 * 1024.0 * 1024.0)

        db_size_percent = (backup_gb / db_gb) * 100 if db_gb > 0 else 0

        print(
            f"{format_str:<20}{backup_mins:.1f}{'':<14}{restore_mins:.1f}{'':<19}{total_mins:.1f}{'':<14}"
            f"{backup_gb:.1f}{'':<9}{db_gb:.1f}{'':<9}{restored_db_gb:.1f}{'':<14}{db_size_percent:.0f}"
        )


def main():
    """Main function to run pg_dump backup performance tests."""
    parser = argparse.ArgumentParser(
        description="PostgreSQL pg_dump backup performance testing"
    )
    parser.add_argument(
        "--skip-gzip", action="store_true", help="Skip gzip compression tests"
    )
    parser.add_argument(
        "--skip-lz4", action="store_true", help="Skip lz4 compression tests"
    )
    parser.add_argument(
        "--skip-zstd", action="store_true", help="Skip zstd compression tests"
    )
    parser.add_argument(
        "--skip-restore", action="store_true", help="Skip restore tests"
    )

    args = parser.parse_args()

    # Filter compression methods based on skip flags
    global COMPRESSION_METHODS
    filtered_compression_methods = ["none"]  # Always include 'none'

    if not args.skip_gzip:
        filtered_compression_methods.append("gzip")
    if not args.skip_lz4:
        filtered_compression_methods.append("lz4")
    if not args.skip_zstd:
        filtered_compression_methods.append("zstd")

    COMPRESSION_METHODS = filtered_compression_methods

    log_with_timestamp("PostgreSQL pg_dump Backup Performance Testing")
    log_with_timestamp("=" * 60)
    log_with_timestamp(
        f"Source Database: {DB_CONFIG['database']} on {DB_CONFIG['host']}:{DB_CONFIG['port']}"
    )
    log_with_timestamp(
        f"Restore Database: {RESTORE_DB_CONFIG['database']} on {RESTORE_DB_CONFIG['host']}:{RESTORE_DB_CONFIG['port']}"
    )
    log_with_timestamp(f"Database size: {format_size(get_database_size())}")
    log_with_timestamp(f"Compression methods: {', '.join(COMPRESSION_METHODS)}")
    log_with_timestamp(
        f"Restore tests: {'Skipped' if args.skip_restore else 'Enabled'}"
    )
    log_with_timestamp(f"Test started: {datetime.now()}")

    # Ensure backup directory is ready
    backup_dir = ensure_backup_directory()
    log_with_timestamp(f"Backup directory: {backup_dir.absolute()}")

    try:
        # Run pg_dump tests only
        test_pg_dump(skip_restore=args.skip_restore)

        # Save and display results
        save_results()
        print_summary()
        print_tabular_report()

    except KeyboardInterrupt:
        log_with_timestamp("\nTest interrupted by user")
        if results:
            save_results()
            print_summary()
            print_tabular_report()
    except Exception as e:
        log_with_timestamp(f"\nTest failed with error: {e}")
        if results:
            save_results()
            print_summary()
            print_tabular_report()

    log_with_timestamp(f"\nTest completed: {datetime.now()}")


if __name__ == "__main__":
    main()
Из визуализаций исключаются результаты для zstd с уровнями 15 и 22 из-за их значительного влияния на графики из-за длительного сжатия без заметного улучшения компрессии.

Графики:
Время создания копии (секунды, ниже — лучше).
Время восстановления (секунды, ниже — лучше).
Общее время (секунды, ниже — лучше).
Размер копии (% от исходной базы, ниже — лучше).
Рекомендации на основе данных
Учитывая, что тест проводился на синтетических данных, реальные результаты могут отличаться, но основные тенденции останутся. Между Plain, Custom и Directory на синтетических данных разница в скорости незначительна. Custom опережает Plain на ~30%, а Directory — Custom на ~20%. Предположим, что недостаток независимых таблиц и объектов в тесте ограничил потенциальный разрыв.

Давайте выделим рекомендации:

  • Самый быстрый формат: Directory. Он превосходит Custom более чем в 2 раза при создании копий, что особенно ценно при частых бекапах. Custom быстрее Plain и Tar по общему времени, а Directory — быстрее Custom с включенным параллелизмом.
  • Лучший баланс скорости и сжатия: zstd с уровнем 5. Его обгоняют только несжатые форматы по скорости создания, но он медленнее на ~4% при восстановлении. Сжатие сопоставимо с gzip 9, а скорость на ~18% выше, с учетом погрешностей синтетических данных.
  • zstd 15 и 22: Меньше подходят для этого теста. Они обеспечивают сжатие, сравнимое с gzip 9, но время увеличивается в 2 и 8 раз соответственно. Эти уровни могут быть эффективны для баз от 1 ТБ с длительным хранением.
Итоговые выводы
Тестирование показало, что для данной задачи оптимальным является формат Custom с использованием zstd на уровне 5. Это обеспечивает высокую скорость при максимальном сжатии и сохраняет данные в одном файле.

После замены gzip 6 на zstd 5 размер копии сократился почти вдвое, а время создания осталось практически неизменным. Например, база 4.7 ГБ уменьшилась до 276 МБ (в 17 раз!), что отличается от синтетических данных.

Надеемся, этот анализ будет полезен разработчикам инструментов резервного копирования или тем, кто регулярно применяет pg_dump в автоматизации.
Снизу gzip, сверху zstd
Предлагаем курс от Слёрм — по работе с СУБД PostgreSQL и оптимизации SQL-запросов. Научитесь построению репликаций, работе с резервным копированием и организации мониторинга всей системы.
PostgreSQL База