中国队世界杯_2014世界杯德国 - dyhdcw.com

最强总结!数据库优化完全指南!!

大家好,我是岳哥。

这是《最强总结》系列第三篇,之前分享的还没阅读的可以看这篇:

最强总结!数据库开窗函数完全指南!!

最强总结!SQL Server/MySQL/Oracle函数完全指南!!

数据库优化是提升应用性能的关键环节。本文将从多个维度系统地介绍数据库优化的方法和实践经验。

这些总结都是此前整理好后保存的,最近集中发布,觉得有帮助,记得三连(点赞+转发+在看),岳哥才会更有动力继续发布。此外,大家也可以留言需要哪方面的总结。

一、SQL查询优化

二、数据库配置优化

三、表结构优化

四、分区和分表优化

五、缓存优化

六、监控和维护

七、安全优化

需要PDF版本的同学,可以在公众号后台回复:数据库优化

一、SQL查询优化1. 索引优化代码语言:javascript代码运行次数:0运行复制-- 1. 创建合适的索引

CREATE INDEX idx_user_email ON users(email);

CREATE INDEX idx_order_status_date ON orders(status, created_at);

-- 2. 联合索引优化

-- 不推荐:创建多个单列索引

CREATE INDEX idx_name ON users(name);

CREATE INDEX idx_email ON users(email);

CREATE INDEX idx_phone ON users(phone);

-- 推荐:创建合适的联合索引

CREATE INDEX idx_users_search ON users(name, email, phone);

-- 3. 索引排序优化

-- 考虑排序方向的索引

CREATE INDEX idx_products_category_price ON products(category ASC, price DESC);

-- 4. 前缀索引

-- 对于长字符串字段,使用前缀索引

CREATE INDEX idx_description ON products(description(50));

-- 5. 覆盖索引

-- 创建包含所有查询所需字段的索引

CREATE INDEX idx_orders_covering ON orders(order_id, status, created_at, total_amount);

-- 使用覆盖索引的查询

EXPLAIN SELECT order_id, status, created_at, total_amount

FROM orders

WHERE status = 'pending'

ORDER BY created_at DESC;

-- 6. 避免索引失效

-- 不推荐:使用函数导致索引失效

SELECT * FROM users WHERE YEAR(created_at) = 2024;

-- 推荐:改写为范围查询

SELECT * FROM users

WHERE created_at >= '2024-01-01'

AND created_at < '2025-01-01';

2. 查询重写代码语言:javascript代码运行次数:0运行复制-- 1. 避免SELECT *

-- 不推荐

SELECT * FROM orders JOIN users ON orders.user_id = users.id;

-- 推荐

SELECT o.order_id, o.status, u.name, u.email

FROM orders o

JOIN users u ON o.user_id = u.id;

-- 2. 使用EXISTS替代IN

-- 不推荐

SELECT * FROM orders

WHERE user_id IN (SELECT id FROM users WHERE status = 'active');

-- 推荐

SELECT * FROM orders o

WHERE EXISTS (

SELECT 1 FROM users u

WHERE u.id = o.user_id AND u.status = 'active'

);

-- 3. 使用UNION ALL替代UNION

-- 不推荐

SELECT order_id FROM completed_orders

UNION

SELECT order_id FROM pending_orders;

-- 推荐

SELECT order_id FROM completed_orders

UNION ALL

SELECT order_id FROM pending_orders;

-- 4. 使用JOIN替代子查询

-- 不推荐

SELECT *,

(SELECT name FROM users WHERE users.id = orders.user_id) as user_name

FROM orders;

-- 推荐

SELECT o.*, u.name as user_name

FROM orders o

LEFT JOIN users u ON o.user_id = u.id;

-- 5. 分页优化

-- 不推荐

SELECT * FROM orders ORDER BY created_at DESC LIMIT 1000000, 10;

-- 推荐

SELECT * FROM orders

WHERE created_at < (

SELECT created_at FROM orders

ORDER BY created_at DESC

LIMIT 1 OFFSET 999999

)

ORDER BY created_at DESC

LIMIT 10;

-- 6. 使用临时表优化复杂查询

-- 创建临时表存储中间结果

CREATE TEMPORARY TABLE tmp_order_stats AS

SELECT user_id, COUNT(*) as order_count, SUM(amount) as total_amount

FROM orders

GROUP BY user_id;

-- 使用临时表

SELECT u.*, t.order_count, t.total_amount

FROM users u

LEFT JOIN tmp_order_stats t ON u.id = t.user_id;

二、数据库配置优化1. 内存配置代码语言:javascript代码运行次数:0运行复制# MySQL配置文件 my.cnf

# 1. InnoDB缓冲池配置

innodb_buffer_pool_size = 12G # 总内存的50-75%

innodb_buffer_pool_instances = 8 # CPU核心数相当

innodb_buffer_pool_chunk_size = 128M # 默认值,通常不需要修改

# 2. 查询缓存配置(MySQL 8.0已移除)

query_cache_type = 0 # 禁用查询缓存

query_cache_size = 0 # 设置为0

# 3. 排序缓冲区

sort_buffer_size = 2M # 每个会话的排序缓冲区

join_buffer_size = 2M # 每个会话的连接缓冲区

# 4. 临时表配置

tmp_table_size = 64M # 内存临时表大小

max_heap_table_size = 64M # 用户创建的内存表大小

# 5. 连接缓冲区

max_connections = 1000 # 最大连接数

thread_cache_size = 128 # 线程缓存大小

# 6. InnoDB日志配置

innodb_log_file_size = 1G # 重做日志文件大小

innodb_log_files_in_group = 2 # 重做日志文件组数

innodb_log_buffer_size = 16M # 日志缓冲区大小

# 7. 性能监控配置

performance_schema = ON

performance_schema_max_table_instances = 1000

# 查看当前内存使用情况的SQL

SELECT

event_name AS memory_type,

current_alloc AS current_bytes,

high_alloc AS max_bytes

FROM performance_schema.memory_summary_global_by_event_name

WHERE event_name LIKE '%memory%'

ORDER BY current_alloc DESC;

# 查看InnoDB缓冲池状态

SHOW ENGINE INNODB STATUS;

# 查看全局变量

SHOW VARIABLES LIKE 'innodb_buffer_pool_size';

# 检查缓冲池命中率

SELECT

(1 - (

SELECT variable_value

FROM performance_schema.global_status

WHERE variable_name = 'Innodb_buffer_pool_reads'

) / (

SELECT variable_value

FROM performance_schema.global_status

WHERE variable_name = 'Innodb_buffer_pool_read_requests'

)) * 100 as buffer_pool_hit_ratio;

2. 磁盘I/O优化代码语言:javascript代码运行次数:0运行复制# MySQL配置文件 my.cnf

# 1. InnoDB文件I/O配置

innodb_file_per_table = ON # 每个表使用独立的表空间文件

innodb_flush_method = O_DIRECT # 使用直接I/O

innodb_io_capacity = 2000 # SSD磁盘I/O能力

innodb_io_capacity_max = 4000 # 最大I/O能力

# 2. 日志写入配置

innodb_flush_log_at_trx_commit = 1 # 事务提交时写入磁盘

sync_binlog = 1 # 每次事务同步二进制日志

# 3. 双写缓冲区配置

innodb_doublewrite = ON # 启用双写缓冲

innodb_doublewrite_batch_size = 32 # 批处理大小

# 4. 读写分离配置

innodb_read_io_threads = 8 # 读线程数

innodb_write_io_threads = 8 # 写线程数

# 5. 预读配置

innodb_read_ahead_threshold = 56 # 预读阈值

innodb_random_read_ahead = OFF # 随机预读

# 监控I/O性能的SQL查询

SELECT

event_name,

count_star as count,

sum_timer_wait/1000000000 as total_ms,

avg_timer_wait/1000000000 as avg_ms,

max_timer_wait/1000000000 as max_ms

FROM performance_schema.events_waits_summary_global_by_event_name

WHERE event_name LIKE '%io%'

ORDER BY sum_timer_wait DESC;

# 检查InnoDB I/O状态

SHOW ENGINE INNODB STATUS;

# 查看文件I/O情况

SELECT

file_name,

event_name,

count_read,

count_write,

sum_number_of_bytes_read,

sum_number_of_bytes_write

FROM performance_schema.file_summary_by_instance

WHERE event_name LIKE '%innodb%'

ORDER BY sum_number_of_bytes_read + sum_number_of_bytes_write DESC;

# 检查慢查询

SELECT

query,

exec_count,

total_latency,

avg_latency,

rows_sent_avg,

rows_examined_avg

FROM sys.statements_with_runtimes_in_95th_percentile;

3. 并发配置代码语言:javascript代码运行次数:0运行复制# MySQL并发配置

# 1. 连接管理

max_connections = 1000 # 最大连接数

max_user_connections = 800 # 每个用户最大连接数

thread_cache_size = 128 # 线程缓存大小

wait_timeout = 600 # 非交互连接超时时间

interactive_timeout = 1800 # 交互连接超时时间

# 2. 锁配置

innodb_lock_wait_timeout = 50 # 锁等待超时时间

innodb_deadlock_detect = ON # 死锁检测

innodb_autoinc_lock_mode = 2 # 自增锁模式

# 3. 事务配置

transaction_isolation = REPEATABLE-READ # 事务隔离级别

innodb_rollback_segments = 128 # 回滚段数量

# 监控并发情况的SQL

# 检查当前连接数

SELECT COUNT(*) AS connections,

USER AS user,

HOST AS host

FROM INFORMATION_SCHEMA.PROCESSLIST

GROUP BY user, host;

# 检查锁等待情况

SELECT

r.trx_id waiting_trx_id,

r.trx_state waiting_trx_state,

b.trx_id blocking_trx_id,

b.trx_state blocking_trx_state,

TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) wait_time

FROM information_schema.innodb_lock_waits w

INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id

INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

# 检查事务状态

SELECT * FROM information_schema.innodb_trx

WHERE trx_state != 'RUNNING';

# 检查表锁情况

SHOW OPEN TABLES WHERE in_use > 0;

三、表结构优化1. 数据类型优化代码语言:javascript代码运行次数:0运行复制-- 1. 整数类型优化

-- 不推荐

CREATE TABLE products (

id INT, -- 范围过大

status TINYINT, -- 未声明是否有符号

quantity INT, -- 范围过大

category_id INT -- 范围过大

);

-- 推荐

CREATE TABLE products (

id MEDIUMINT UNSIGNED AUTO_INCREMENT, -- 适中范围

status TINYINT UNSIGNED, -- 明确无符号

quantity SMALLINT UNSIGNED, -- 适合范围

category_id SMALLINT UNSIGNED -- 适合范围

);

-- 2. 字符串类型优化

-- 不推荐

CREATE TABLE users (

id INT,

name VARCHAR(255), -- 长度过长

email VARCHAR(255), -- 长度过长

phone CHAR(20), -- 使用固定长度

description TEXT -- 未限制长度

);

-- 推荐

CREATE TABLE users (

id INT,

name VARCHAR(50), -- 适合长度

email VARCHAR(100), -- 适合长度

phone VARCHAR(20), -- 可变长度

description TEXT(16383) -- MEDIUMTEXT替代

);

-- 3. 时间类型优化

-- 不推荐

CREATE TABLE orders (

id INT,

created_at VARCHAR(30), -- 使用字符串存储时间

updated_at VARCHAR(30), -- 使用字符串存储时间

delivery_date DATETIME -- 不需要时间精度

);

-- 推荐

CREATE TABLE orders (

id INT,

created_at TIMESTAMP, -- 自动更新时间戳

updated_at TIMESTAMP, -- 自动更新时间戳

delivery_date DATE -- 只需要日期

);

-- 4. 小数类型优化

-- 不推荐

CREATE TABLE financial_records (

id INT,

amount FLOAT, -- 存在精度问题

price DOUBLE, -- 存在精度问题

tax_rate DECIMAL(10,2) -- 范围过大

);

-- 推荐

CREATE TABLE financial_records (

id INT,

amount DECIMAL(8,2), -- 精确定点数

price DECIMAL(8,2), -- 精确定点数

tax_rate DECIMAL(5,2) -- 适合范围

);

-- 5. 枚举类型优化

-- 不推荐

CREATE TABLE tickets (

id INT,

status VARCHAR(20), -- 使用字符串存储固定选项

priority VARCHAR(20) -- 使用字符串存储固定选项

);

-- 推荐

CREATE TABLE tickets (

id INT,

status ENUM('open', 'closed', 'pending'), -- 使用枚举

priority ENUM('low', 'medium', 'high') -- 使用枚举

);

-- 6. JSON类型优化(MySQL 5.7+)

-- 不推荐

CREATE TABLE product_attributes (

id INT,

attributes VARCHAR(4000) -- 使用字符串存储JSON

);

-- 推荐

CREATE TABLE product_attributes (

id INT,

attributes JSON, -- 使用JSON类型

INDEX idx_attr ((CAST(attributes->>'$.color' AS CHAR(20)))) -- JSON索引

);

2. 范式和反范式优化代码语言:javascript代码运行次数:0运行复制-- 1. 范式设计示例

-- 第三范式设计

CREATE TABLE orders (

order_id INT PRIMARY KEY,

user_id INT,

order_date DATE,

status VARCHAR(20),

FOREIGN KEY (user_id) REFERENCES users(id)

);

CREATE TABLE order_items (

order_item_id INT PRIMARY KEY,

order_id INT,

product_id INT,

quantity INT,

unit_price DECIMAL(10,2),

FOREIGN KEY (order_id) REFERENCES orders(order_id),

FOREIGN KEY (product_id) REFERENCES products(id)

);

-- 2. 反范式设计示例

-- 为了查询效率,在orders表中冗余一些常用信息

CREATE TABLE orders_denormalized (

order_id INT PRIMARY KEY,

user_id INT,

user_name VARCHAR(50), -- 冗余用户名

user_email VARCHAR(100), -- 冗余用户邮箱

order_date DATE,

status VARCHAR(20),

total_amount DECIMAL(10,2), -- 冗余订单总额

total_items INT, -- 冗余商品总数

FOREIGN KEY (user_id) REFERENCES users(id)

);

-- 3. 混合设计示例

-- 保持基本范式,但添加汇总表

CREATE TABLE order_summaries (

order_id INT PRIMARY KEY,

total_amount DECIMAL(10,2),

total_items INT,

last_updated TIMESTAMP,

FOREIGN KEY (order_id) REFERENCES orders(order_id)

);

-- 创建触发器自动更新汇总信息

DELIMITER $$

CREATE TRIGGER after_order_item_insert

AFTER INSERT ON order_items

FOR EACH ROW

BEGIN

INSERT INTO order_summaries (order_id, total_amount, total_items)

SELECT

NEW.order_id,

SUM(quantity * unit_price),

SUM(quantity)

FROM order_items

WHERE order_id = NEW.order_id

ON DUPLICATE KEY UPDATE

total_amount = VALUES(total_amount),

total_items = VALUES(total_items),

last_updated = CURRENT_TIMESTAMP;

END $$

DELIMITER ;

-- 4. 垂直分表示例

-- 将大字段拆分到单独的表

CREATE TABLE products (

id INT PRIMARY KEY,

name VARCHAR(100),

price DECIMAL(10,2),

category_id INT

);

CREATE TABLE product_details (

product_id INT PRIMARY KEY,

description TEXT,

specifications JSON,

images JSON,

FOREIGN KEY (product_id) REFERENCES products(id)

);

-- 5. 水平分表示例

-- 按时间范围分表

CREATE TABLE orders_2023 (

order_id INT PRIMARY KEY,

user_id INT,

order_date DATE,

CHECK (YEAR(order_date) = 2023)

);

CREATE TABLE orders_2024 (

order_id INT PRIMARY KEY,

user_id INT,

order_date DATE,

CHECK (YEAR(order_date) = 2024)

);

四、分区和分表优化1. 分区策略代码语言:javascript代码运行次数:0运行复制-- 1. 范围分区(RANGE)

CREATE TABLE orders (

order_id INT,

order_date DATE,

amount DECIMAL(10,2),

customer_id INT

)

PARTITION BY RANGE (YEAR(order_date)) (

PARTITION p2022 VALUES LESS THAN (2023),

PARTITION p2023 VALUES LESS THAN (2024),

PARTITION p2024 VALUES LESS THAN (2025),

PARTITION pmax VALUES LESS THAN MAXVALUE

);

-- 2. 列表分区(LIST)

CREATE TABLE sales (

id INT,

amount DECIMAL(10,2),

region VARCHAR(20)

)

PARTITION BY LIST COLUMNS(region) (

PARTITION p_east VALUES IN ('east', 'northeast'),

PARTITION p_west VALUES IN ('west', 'northwest'),

PARTITION p_cent VALUES IN ('central'),

PARTITION p_south VALUES IN ('south', 'southeast')

);

-- 3. 哈希分区(HASH)

CREATE TABLE products (

id INT,

name VARCHAR(100),

created_at DATE

)

PARTITION BY HASH(id)

PARTITIONS 4;

-- 4. 复合分区策略

CREATE TABLE customer_orders (

order_id INT,

customer_id INT,

order_date DATE,

amount DECIMAL(10,2)

)

PARTITION BY RANGE(YEAR(order_date))

SUBPARTITION BY HASH(customer_id)

SUBPARTITIONS 4 (

PARTITION p2023 VALUES LESS THAN (2024),

PARTITION p2024 VALUES LESS THAN (2025),

PARTITION pmax VALUES LESS THAN MAXVALUE

);

-- 5. 分区维护

-- 添加新分区

ALTER TABLE orders ADD PARTITION (

PARTITION p2025 VALUES LESS THAN (2026)

);

-- 删除分区

ALTER TABLE orders DROP PARTITION p2022;

-- 重组分区

ALTER TABLE orders REORGANIZE PARTITION p2023 INTO (

PARTITION p2023_h1 VALUES LESS THAN (2023-07-01),

PARTITION p2023_h2 VALUES LESS THAN (2024)

);

-- 6. 分区查询优化

-- 使用分区键进行查询

EXPLAIN SELECT *

FROM orders

WHERE order_date BETWEEN '2023-01-01' AND '2023-12-31';

-- 分区信息查看

SELECT

PARTITION_NAME,

PARTITION_ORDINAL_POSITION,

TABLE_ROWS

FROM information_schema.PARTITIONS

WHERE TABLE_NAME = 'orders';

-- 7. 分区表维护

-- 检查分区

ALTER TABLE orders CHECK PARTITION p2023, p2024;

-- 优化分区

ALTER TABLE orders OPTIMIZE PARTITION p2023, p2024;

-- 分析分区

ALTER TABLE orders ANALYZE PARTITION p2023, p2024;

-- 重建分区

ALTER TABLE orders REBUILD PARTITION p2023, p2024;

2. 分表策略代码语言:javascript代码运行次数:0运行复制-- 1. 水平分表策略

-- 按用户ID范围分表

CREATE TABLE orders_0000_0999 (

order_id BIGINT PRIMARY KEY,

user_id INT CHECK (user_id BETWEEN 0 AND 999),

order_date TIMESTAMP,

amount DECIMAL(10,2)

);

CREATE TABLE orders_1000_1999 (

order_id BIGINT PRIMARY KEY,

user_id INT CHECK (user_id BETWEEN 1000 AND 1999),

order_date TIMESTAMP,

amount DECIMAL(10,2)

);

-- 创建路由视图

CREATE VIEW orders_all AS

SELECT * FROM orders_0000_0999

UNION ALL

SELECT * FROM orders_1000_1999;

-- 2. 垂直分表策略

-- 基础信息表

CREATE TABLE products_base (

product_id INT PRIMARY KEY,

name VARCHAR(100),

price DECIMAL(10,2),

category_id INT

);

-- 详细信息表

CREATE TABLE products_details (

product_id INT PRIMARY KEY,

description TEXT,

specifications JSON,

created_at TIMESTAMP,

updated_at TIMESTAMP,

FOREIGN KEY (product_id) REFERENCES products_base(product_id)

);

-- 3. 时间分表策略

-- 按月分表

CREATE TABLE orders_202401 (

order_id BIGINT PRIMARY KEY,

user_id INT,

order_date TIMESTAMP CHECK (order_date >= '2024-01-01' AND order_date < '2024-02-01'),

amount DECIMAL(10,2)

);

CREATE TABLE orders_202402 (

order_id BIGINT PRIMARY KEY,

user_id INT,

order_date TIMESTAMP CHECK (order_date >= '2024-02-01' AND order_date < '2024-03-01'),

amount DECIMAL(10,2)

);

-- 4. 分表管理存储过程

DELIMITER $$

CREATE PROCEDURE create_monthly_order_table(IN p_year INT, IN p_month INT)

BEGIN

SET @table_name = CONCAT('orders_', p_year, LPAD(p_month, 2, '0'));

SET @start_date = DATE_FORMAT(CONCAT(p_year, '-', p_month, '-01'), '%Y-%m-%d');

SET @end_date = DATE_FORMAT(DATE_ADD(@start_date, INTERVAL 1 MONTH), '%Y-%m-%d');

SET @sql = CONCAT(

'CREATE TABLE ', @table_name, ' (

order_id BIGINT PRIMARY KEY,

user_id INT,

order_date TIMESTAMP CHECK (order_date >= ''', @start_date, ''' AND order_date < ''', @end_date, '''),

amount DECIMAL(10,2),

INDEX idx_user_id (user_id),

INDEX idx_order_date (order_date)

)'

);

PREPARE stmt FROM @sql;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

END $$

DELIMITER ;

-- 5. 分表查询优化

-- 创建分表汇总视图

CREATE VIEW orders_summary AS

SELECT 'orders_202401' as table_name, COUNT(*) as record_count,

MIN(order_date) as min_date, MAX(order_date) as max_date

FROM orders_202401

UNION ALL

SELECT 'orders_202402' as table_name, COUNT(*) as record_count,

MIN(order_date) as min_date, MAX(order_date) as max_date

FROM orders_202402;

-- 6. 分表数据迁移

-- 创建数据迁移存储过程

DELIMITER $$

CREATE PROCEDURE migrate_orders_data(

IN source_table VARCHAR(50),

IN target_table VARCHAR(50),

IN batch_size INT

)

BEGIN

DECLARE done INT DEFAULT FALSE;

DECLARE batch_start BIGINT;

-- 获取最小ID

SET batch_start = (SELECT MIN(order_id) FROM source_table);

REPEAT

-- 批量迁移数据

INSERT INTO target_table

SELECT * FROM source_table

WHERE order_id >= batch_start

AND order_id < batch_start + batch_size;

-- 更新起始ID

SET batch_start = batch_start + batch_size;

-- 检查是否完成

SET done = NOT EXISTS(

SELECT 1 FROM source_table

WHERE order_id >= batch_start LIMIT 1

);

-- 提交事务并等待一小段时间

COMMIT;

DO SLEEP(0.1);

UNTIL done END REPEAT;

END $$

DELIMITER ;

五、缓存优化1. 查询缓存代码语言:javascript代码运行次数:0运行复制-- 1. MySQL查询缓存配置(适用于MySQL 5.7及以下版本)

SET GLOBAL query_cache_type = 1;

SET GLOBAL query_cache_size = 134217728; -- 128MB

-- 2. 应用层查询缓存实现

-- 创建缓存表

CREATE TABLE query_cache (

cache_key VARCHAR(255) PRIMARY KEY,

cache_value TEXT,

expires_at TIMESTAMP,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

);

-- 创建缓存管理存储过程

DELIMITER $$

CREATE PROCEDURE cache_query(

IN p_cache_key VARCHAR(255),

IN p_query TEXT,

IN p_expire_minutes INT

)

BEGIN

DECLARE v_cache_value TEXT;

DECLARE v_is_expired BOOLEAN;

-- 检查缓存是否存在且未过期

SELECT

cache_value,

expires_at < NOW() INTO v_cache_value, v_is_expired

FROM query_cache

WHERE cache_key = p_cache_key;

-- 如果缓存不存在或已过期,执行查询并更新缓存

IF v_cache_value IS NULL OR v_is_expired THEN

SET @sql = p_query;

PREPARE stmt FROM @sql;

EXECUTE stmt;

-- 将结果存入缓存

-- 注意:这里假设查询结果可以转换为JSON格式

SET @result = (SELECT JSON_ARRAYAGG(JSON_OBJECT(

'column1', column1,

'column2', column2

-- 添加更多列

)) FROM (SELECT * FROM tmp_result) t);

-- 更新缓存

INSERT INTO query_cache (cache_key, cache_value, expires_at)

VALUES (

p_cache_key,

@result,

DATE_ADD(NOW(), INTERVAL p_expire_minutes MINUTE)

)

ON DUPLICATE KEY UPDATE

cache_value = VALUES(cache_value),

expires_at = VALUES(expires_at);

DEALLOCATE PREPARE stmt;

ELSE

-- 返回缓存的结果

SELECT JSON_EXTRACT(v_cache_value, '$[*]');

END IF;

END $$

DELIMITER ;

-- 3. 缓存预热

DELIMITER $$

CREATE PROCEDURE warm_up_cache()

BEGIN

-- 预热热门商品缓存

CALL cache_query(

'hot_products',

'SELECT * FROM products WHERE sales_count > 1000 ORDER BY sales_count DESC LIMIT 100',

60 -- 缓存60分钟

);

-- 预热分类统计缓存

CALL cache_query(

'category_stats',

'SELECT category_id, COUNT(*) as product_count, AVG(price) as avg_price FROM products GROUP BY category_id',

120 -- 缓存120分钟

);

END $$

DELIMITER ;

-- 4. 缓存清理

DELIMITER $$

CREATE PROCEDURE clean_expired_cache()

BEGIN

-- 删除过期缓存

DELETE FROM query_cache WHERE expires_at < NOW();

-- 压缩缓存表

OPTIMIZE TABLE query_cache;

END $$

DELIMITER ;

-- 创建定时任务

CREATE EVENT clean_cache_event

ON SCHEDULE EVERY 1 HOUR

DO CALL clean_expired_cache();

-- 5. 缓存监控

CREATE VIEW cache_statistics AS

SELECT

COUNT(*) as total_cache_entries,

COUNT(CASE WHEN expires_at < NOW() THEN 1 END) as expired_entries,

AVG(TIMESTAMPDIFF(MINUTE, created_at, expires_at)) as avg_cache_duration,

MAX(updated_at) as last_cache_update

FROM query_cache;

2. 缓存策略代码语言:javascript代码运行次数:0运行复制-- 1. 多级缓存实现

CREATE TABLE cache_levels (

id INT PRIMARY KEY,

level_name VARCHAR(50),

priority INT,

max_size BIGINT,

expire_time INT -- 秒数

);

INSERT INTO cache_levels VALUES

(1, 'memory', 1, 1073741824, 300), -- 1GB, 5分钟

(2, 'redis', 2, 10737418240, 3600), -- 10GB, 1小时

(3, 'disk', 3, 107374182400, 86400); -- 100GB, 1天

-- 2. 缓存队列表

CREATE TABLE cache_queue (

id BIGINT PRIMARY KEY AUTO_INCREMENT,

cache_key VARCHAR(255),

cache_level INT,

priority INT,

status ENUM('pending', 'processing', 'completed', 'failed'),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

FOREIGN KEY (cache_level) REFERENCES cache_levels(id)

);

-- 3. 缓存淘汰策略实现

DELIMITER $$

CREATE PROCEDURE evict_cache(IN p_level_id INT)

BEGIN

DECLARE v_max_size BIGINT;

DECLARE v_current_size BIGINT;

-- 获取缓存级别的最大大小

SELECT max_size INTO v_max_size

FROM cache_levels

WHERE id = p_level_id;

-- 获取当前缓存大小

SELECT SUM(LENGTH(cache_value)) INTO v_current_size

FROM query_cache

WHERE cache_level = p_level_id;

-- 如果超过最大大小,执行LRU淘汰

IF v_current_size > v_max_size THEN

DELETE FROM query_cache

WHERE cache_level = p_level_id

AND id IN (

SELECT id

FROM (

SELECT id

FROM query_cache

WHERE cache_level = p_level_id

ORDER BY updated_at ASC

LIMIT 100

) tmp

);

END IF;

END $$

DELIMITER ;

-- 4. 缓存预加载策略

DELIMITER $$

CREATE PROCEDURE preload_cache()

BEGIN

-- 预加载热门商品

INSERT INTO cache_queue (cache_key, cache_level, priority, status)

SELECT

CONCAT('product:', id),

1, -- memory缓存级别

10, -- 高优先级

'pending'

FROM products

WHERE views_count > 1000

ORDER BY views_count DESC

LIMIT 100;

-- 预加载分类统计

INSERT INTO cache_queue (cache_key, cache_level, priority, status)

SELECT

CONCAT('category_stats:', id),

2, -- redis缓存级别

5, -- 中等优先级

'pending'

FROM categories;

END $$

DELIMITER ;

-- 5. 缓存一致性维护

DELIMITER $$

CREATE TRIGGER after_product_update

AFTER UPDATE ON products

FOR EACH ROW

BEGIN

-- 清除相关缓存

DELETE FROM query_cache

WHERE cache_key LIKE CONCAT('product:', NEW.id, '%');

-- 添加缓存重建任务

INSERT INTO cache_queue (cache_key, cache_level, priority, status)

VALUES (

CONCAT('product:', NEW.id),

1, -- memory缓存级别

10, -- 高优先级

'pending'

);

END $$

DELIMITER ;

-- 6. 缓存监控和统计

CREATE VIEW cache_statistics AS

SELECT

cl.level_name,

COUNT(*) as entry_count,

SUM(LENGTH(qc.cache_value)) as total_size,

AVG(TIMESTAMPDIFF(SECOND, qc.created_at, qc.expires_at)) as avg_ttl,

COUNT(CASE WHEN qc.expires_at < NOW() THEN 1 END) as expired_count

FROM query_cache qc

JOIN cache_levels cl ON qc.cache_level = cl.id

GROUP BY cl.level_name;

六、监控和维护1. 性能监控代码语言:javascript代码运行次数:0运行复制-- 1. 慢查询监控

-- 配置慢查询日志

SET GLOBAL slow_query_log = 'ON';

SET GLOBAL slow_query_log_file = '/var/log/mysql/slow-query.log';

SET GLOBAL long_query_time = 2; -- 设置慢查询阈值为2秒

-- 创建慢查询分析视图

CREATE VIEW slow_query_analysis AS

SELECT

db_name,

SUBSTRING_INDEX(SUBSTRING_INDEX(query_text, ' ', 1), '\n', 1) as query_pattern,

COUNT(*) as execution_count,

AVG(query_time) as avg_time,

MAX(query_time) as max_time,

SUM(rows_examined) as total_rows_examined,

SUM(rows_sent) as total_rows_sent

FROM mysql.slow_log

GROUP BY db_name, query_pattern

ORDER BY avg_time DESC;

-- 2. 连接数监控

CREATE TABLE connection_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

total_connections INT,

active_connections INT,

waiting_connections INT

);

DELIMITER $$

CREATE PROCEDURE monitor_connections()

BEGIN

INSERT INTO connection_log (total_connections, active_connections, waiting_connections)

SELECT

COUNT(*) as total_connections,

COUNT(CASE WHEN command != 'Sleep' THEN 1 END) as active_connections,

COUNT(CASE WHEN state = 'Waiting for table lock' THEN 1 END) as waiting_connections

FROM information_schema.processlist;

END $$

DELIMITER ;

-- 创建定时任务

CREATE EVENT monitor_connections_event

ON SCHEDULE EVERY 1 MINUTE

DO CALL monitor_connections();

-- 3. 资源使用监控

CREATE TABLE resource_usage_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

cpu_usage FLOAT,

memory_usage BIGINT,

disk_usage BIGINT,

iops INT

);

-- 4. 查询性能监控

CREATE TABLE query_performance_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

query_id VARCHAR(32),

query_text TEXT,

execution_time DECIMAL(10,4),

rows_affected INT,

timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

explain_plan JSON

);

DELIMITER $$

CREATE TRIGGER before_query_execution

BEFORE INSERT ON query_performance_log

FOR EACH ROW

BEGIN

-- 获取执行计划

SET @explain_json = (

SELECT JSON_ARRAYAGG(

JSON_OBJECT(

'select_type', select_type,

'table', table_name,

'type', access_type,

'rows', rows_examined_per_scan

)

)

FROM information_schema.processlist

WHERE info = NEW.query_text

);

SET NEW.explain_plan = @explain_json;

END $$

DELIMITER ;

-- 5. 性能报告生成

CREATE VIEW performance_summary AS

WITH query_stats AS (

SELECT

DATE(timestamp) as stat_date,

COUNT(*) as total_queries,

AVG(execution_time) as avg_execution_time,

MAX(execution_time) as max_execution_time,

SUM(CASE WHEN execution_time > 1 THEN 1 ELSE 0 END) as slow_queries

FROM query_performance_log

GROUP BY DATE(timestamp)

),

resource_stats AS (

SELECT

DATE(timestamp) as stat_date,

AVG(cpu_usage) as avg_cpu_usage,

MAX(cpu_usage) as max_cpu_usage,

AVG(memory_usage) as avg_memory_usage,

MAX(memory_usage) as max_memory_usage

FROM resource_usage_log

GROUP BY DATE(timestamp)

)

SELECT

qs.stat_date,

qs.total_queries,

qs.avg_execution_time,

qs.slow_queries,

rs.avg_cpu_usage,

rs.avg_memory_usage

FROM query_stats qs

JOIN resource_stats rs ON qs.stat_date = rs.stat_date;

-- 6. 性能预警系统

CREATE TABLE performance_alerts (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

alert_type ENUM('slow_query', 'high_cpu', 'high_memory', 'high_connections'),

alert_message TEXT,

severity ENUM('info', 'warning', 'critical'),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

);

DELIMITER $$

CREATE PROCEDURE check_performance_alerts()

BEGIN

-- 检查慢查询

INSERT INTO performance_alerts (alert_type, alert_message, severity)

SELECT

'slow_query',

CONCAT('Query took ', execution_time, ' seconds: ', LEFT(query_text, 100)),

CASE

WHEN execution_time > 10 THEN 'critical'

WHEN execution_time > 5 THEN 'warning'

ELSE 'info'

END

FROM query_performance_log

WHERE execution_time > 2

AND timestamp > DATE_SUB(NOW(), INTERVAL 5 MINUTE);

-- 检查资源使用

INSERT INTO performance_alerts (alert_type, alert_message, severity)

SELECT

'high_cpu',

CONCAT('CPU usage at ', ROUND(cpu_usage, 2), '%'),

CASE

WHEN cpu_usage > 90 THEN 'critical'

WHEN cpu_usage > 70 THEN 'warning'

ELSE 'info'

END

FROM resource_usage_log

WHERE timestamp > DATE_SUB(NOW(), INTERVAL 5 MINUTE)

AND cpu_usage > 70;

END $$

DELIMITER ;

-- 创建定时检查任务

CREATE EVENT check_performance_alerts_event

ON SCHEDULE EVERY 5 MINUTE

DO CALL check_performance_alerts();

2. 定期维护代码语言:javascript代码运行次数:0运行复制-- 1. 表维护程序

DELIMITER $$

CREATE PROCEDURE maintain_tables()

BEGIN

DECLARE done INT DEFAULT FALSE;

DECLARE table_name VARCHAR(255);

DECLARE cur_tables CURSOR FOR

SELECT TABLE_NAME

FROM information_schema.TABLES

WHERE TABLE_SCHEMA = DATABASE();

DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

-- 创建维护日志

CREATE TABLE IF NOT EXISTS maintenance_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

table_name VARCHAR(255),

operation VARCHAR(50),

start_time TIMESTAMP,

end_time TIMESTAMP,

status VARCHAR(20),

error_message TEXT

);

OPEN cur_tables;

read_loop: LOOP

FETCH cur_tables INTO table_name;

IF done THEN

LEAVE read_loop;

END IF;

-- 记录开始时间

INSERT INTO maintenance_log (table_name, operation, start_time, status)

VALUES (table_name, 'OPTIMIZE', NOW(), 'RUNNING');

-- 执行维护操作

SET @sql = CONCAT('OPTIMIZE TABLE ', table_name);

BEGIN

DECLARE CONTINUE HANDLER FOR SQLEXCEPTION

BEGIN

UPDATE maintenance_log

SET status = 'ERROR',

error_message = 'Error during optimization',

end_time = NOW()

WHERE table_name = table_name

AND status = 'RUNNING';

END;

PREPARE stmt FROM @sql;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

-- 更新完成状态

UPDATE maintenance_log

SET status = 'COMPLETED',

end_time = NOW()

WHERE table_name = table_name

AND status = 'RUNNING';

END;

END LOOP;

CLOSE cur_tables;

END $$

DELIMITER ;

-- 2. 统计信息更新

CREATE PROCEDURE update_statistics()

BEGIN

-- 更新表统计信息

ANALYZE TABLE users, orders, products;

-- 更新索引统计信息

ANALYZE TABLE users PERSISTENT FOR COLUMNS (email), INDEXES (idx_email);

END;

-- 3. 数据清理

DELIMITER $$

CREATE PROCEDURE cleanup_old_data()

BEGIN

-- 设置清理时间阈值

SET @cleanup_date = DATE_SUB(CURRENT_DATE, INTERVAL 1 YEAR);

-- 开始事务

START TRANSACTION;

-- 清理旧日志

DELETE FROM system_logs

WHERE created_at < @cleanup_date;

-- 清理旧会话

DELETE FROM user_sessions

WHERE last_activity < @cleanup_date;

-- 归档旧订单

INSERT INTO orders_archive

SELECT * FROM orders

WHERE order_date < @cleanup_date;

DELETE FROM orders

WHERE order_date < @cleanup_date;

-- 提交事务

COMMIT;

END $$

DELIMITER ;

-- 4. 备份管理

DELIMITER $$

CREATE PROCEDURE manage_backups()

BEGIN

-- 记录备份开始

INSERT INTO backup_log (backup_type, start_time, status)

VALUES ('FULL', NOW(), 'RUNNING');

SET @backup_id = LAST_INSERT_ID();

-- 执行备份

SET @backup_file = CONCAT('/backups/full_', DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s'), '.sql');

SET @backup_command = CONCAT(

'mysqldump -u root -p --all-databases --events --routines ',

'> ', @backup_file

);

-- 执行系统命令(需要适当的权限)

SET @result = sys_exec(@backup_command);

-- 更新备份状态

UPDATE backup_log

SET end_time = NOW(),

status = IF(@result = 0, 'COMPLETED', 'FAILED'),

file_path = @backup_file

WHERE id = @backup_id;

-- 清理旧备份

SET @cleanup_date = DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY);

DELETE FROM backup_log

WHERE start_time < @cleanup_date;

END $$

DELIMITER ;

-- 5. 创建维护计划

CREATE EVENT daily_maintenance

ON SCHEDULE EVERY 1 DAY

STARTS CURRENT_DATE + INTERVAL 1 DAY + INTERVAL 1 HOUR

DO

BEGIN

CALL maintain_tables();

CALL update_statistics();

CALL cleanup_old_data();

END;

CREATE EVENT weekly_backup

ON SCHEDULE EVERY 1 WEEK

STARTS CURRENT_DATE + INTERVAL 1 WEEK

DO

CALL manage_backups();

-- 6. 维护监控

CREATE VIEW maintenance_status AS

SELECT

DATE(start_time) as maintenance_date,

operation,

COUNT(*) as total_operations,

COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) as successful_operations,

COUNT(CASE WHEN status = 'ERROR' THEN 1 END) as failed_operations,

AVG(TIMESTAMPDIFF(SECOND, start_time, end_time)) as avg_duration_seconds

FROM maintenance_log

GROUP BY DATE(start_time), operation;

3. 故障恢复代码语言:javascript代码运行次数:0运行复制-- 1. 故障检测和记录

CREATE TABLE system_failures (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

failure_type VARCHAR(50),

failure_description TEXT,

detection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

resolution_time TIMESTAMP,

resolution_description TEXT,

affected_tables TEXT,

severity ENUM('low', 'medium', 'high', 'critical'),

status ENUM('detected', 'investigating', 'resolving', 'resolved')

);

-- 2. 自动故障检测程序

DELIMITER $$

CREATE PROCEDURE detect_database_issues()

BEGIN

-- 检查表损坏

INSERT INTO system_failures (

failure_type,

failure_description,

affected_tables,

severity,

status

)

SELECT

'TABLE_CORRUPTION',

CONCAT('Table ', table_name, ' may be corrupted'),

table_name,

'high',

'detected'

FROM information_schema.TABLES

WHERE CHECK_TABLE = 'CHECK_FAILED';

-- 检查连接问题

IF (

SELECT COUNT(*)

FROM information_schema.PROCESSLIST

WHERE command = 'Sleep'

) > 1000 THEN

INSERT INTO system_failures (

failure_type,

failure_description,

severity,

status

)

VALUES (

'CONNECTION_OVERLOAD',

'Too many sleeping connections',

'medium',

'detected'

);

END IF;

-- 检查磁盘空间

IF (

SELECT DATA_FREE

FROM information_schema.TABLES

WHERE TABLE_SCHEMA = DATABASE()

) < 1073741824 THEN -- 1GB

INSERT INTO system_failures (

failure_type,

failure_description,

severity,

status

)

VALUES (

'LOW_DISK_SPACE',

'Database is running low on disk space',

'critical',

'detected'

);

END IF;

END $$

DELIMITER ;

-- 3. 自动恢复程序

DELIMITER $$

CREATE PROCEDURE attempt_recovery(IN p_failure_id BIGINT)

BEGIN

DECLARE v_failure_type VARCHAR(50);

DECLARE v_affected_tables TEXT;

-- 获取故障信息

SELECT failure_type, affected_tables

INTO v_failure_type, v_affected_tables

FROM system_failures

WHERE id = p_failure_id;

-- 更新状态为处理中

UPDATE system_failures

SET status = 'investigating'

WHERE id = p_failure_id;

-- 根据故障类型执行恢复操作

CASE v_failure_type

WHEN 'TABLE_CORRUPTION' THEN

BEGIN

-- 尝试修复表

SET @repair_sql = CONCAT('REPAIR TABLE ', v_affected_tables);

PREPARE stmt FROM @repair_sql;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

-- 验证修复结果

SET @check_sql = CONCAT('CHECK TABLE ', v_affected_tables);

PREPARE stmt FROM @check_sql;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

END;

WHEN 'CONNECTION_OVERLOAD' THEN

BEGIN

-- 清理空闲连接

KILL CONNECTION (

SELECT id

FROM information_schema.PROCESSLIST

WHERE command = 'Sleep'

AND time > 3600

);

END;

WHEN 'LOW_DISK_SPACE' THEN

BEGIN

-- 执行紧急清理

CALL cleanup_old_data();

-- 优化表空间

OPTIMIZE TABLE users, orders, products;

END;

END CASE;

-- 更新恢复状态

UPDATE system_failures

SET status = 'resolved',

resolution_time = NOW(),

resolution_description = CONCAT('Automatic recovery for ', v_failure_type, ' completed')

WHERE id = p_failure_id;

END $$

DELIMITER ;

-- 4. 数据恢复程序

DELIMITER $$

CREATE PROCEDURE recover_data(

IN p_table_name VARCHAR(255),

IN p_backup_date DATETIME,

IN p_where_clause TEXT

)

BEGIN

-- 记录恢复操作

INSERT INTO recovery_log (

table_name,

backup_date,

where_clause,

start_time,

status

)

VALUES (

p_table_name,

p_backup_date,

p_where_clause,

NOW(),

'STARTED'

);

SET @recovery_id = LAST_INSERT_ID();

-- 创建临时表

SET @create_tmp = CONCAT(

'CREATE TABLE tmp_recovery_', @recovery_id,

' LIKE ', p_table_name

);

PREPARE stmt FROM @create_tmp;

EXECUTE stmt;

-- 从备份恢复数据

SET @restore_data = CONCAT(

'INSERT INTO tmp_recovery_', @recovery_id,

' SELECT * FROM ', p_table_name, '_backup',

' WHERE backup_date = ', QUOTE(p_backup_date),

' AND ', p_where_clause

);

PREPARE stmt FROM @restore_data;

EXECUTE stmt;

-- 合并恢复的数据

SET @merge_data = CONCAT(

'REPLACE INTO ', p_table_name,

' SELECT * FROM tmp_recovery_', @recovery_id

);

PREPARE stmt FROM @merge_data;

EXECUTE stmt;

-- 清理临时表

SET @drop_tmp = CONCAT('DROP TABLE tmp_recovery_', @recovery_id);

PREPARE stmt FROM @drop_tmp;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

-- 更新恢复日志

UPDATE recovery_log

SET end_time = NOW(),

status = 'COMPLETED',

rows_recovered = ROW_COUNT()

WHERE id = @recovery_id;

END $$

DELIMITER ;

-- 5. 创建恢复点

CREATE PROCEDURE create_recovery_point(

IN p_description VARCHAR(255)

)

BEGIN

-- 记录恢复点

INSERT INTO recovery_points (

description,

created_at,

status

)

VALUES (

p_description,

NOW(),

'CREATING'

);

SET @recovery_point_id = LAST_INSERT_ID();

-- 创建表快照

CREATE TABLE recovery_point_data (

recovery_point_id BIGINT,

table_name VARCHAR(255),

data_snapshot LONGBLOB,

created_at TIMESTAMP

);

-- 保存主要表的状态

INSERT INTO recovery_point_data

SELECT

@recovery_point_id,

table_name,

snapshot,

NOW()

FROM (

SELECT

'users' as table_name,

user_data as snapshot

FROM users

UNION ALL

SELECT

'orders' as table_name,

order_data as snapshot

FROM orders

) as snapshots;

-- 更新恢复点状态

UPDATE recovery_points

SET status = 'CREATED'

WHERE id = @recovery_point_id;

END;

七、安全优化1. 权限管理代码语言:javascript代码运行次数:0运行复制-- 1. 创建角色和权限系统

CREATE TABLE roles (

role_id INT PRIMARY KEY AUTO_INCREMENT,

role_name VARCHAR(50) UNIQUE,

description TEXT,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

);

CREATE TABLE permissions (

permission_id INT PRIMARY KEY AUTO_INCREMENT,

permission_name VARCHAR(100) UNIQUE,

description TEXT,

resource_type VARCHAR(50),

access_level ENUM('READ', 'WRITE', 'ADMIN')

);

CREATE TABLE role_permissions (

role_id INT,

permission_id INT,

granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

granted_by VARCHAR(100),

PRIMARY KEY (role_id, permission_id),

FOREIGN KEY (role_id) REFERENCES roles(role_id),

FOREIGN KEY (permission_id) REFERENCES permissions(permission_id)

);

CREATE TABLE user_roles (

user_id INT,

role_id INT,

granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

expires_at TIMESTAMP NULL,

granted_by VARCHAR(100),

PRIMARY KEY (user_id, role_id),

FOREIGN KEY (role_id) REFERENCES roles(role_id)

);

-- 2. 创建权限管理存储过程

DELIMITER $$

CREATE PROCEDURE grant_user_role(

IN p_user_id INT,

IN p_role_name VARCHAR(50),

IN p_granted_by VARCHAR(100)

)

BEGIN

DECLARE v_role_id INT;

-- 获取角色ID

SELECT role_id INTO v_role_id

FROM roles

WHERE role_name = p_role_name;

-- 授予角色

INSERT INTO user_roles (user_id, role_id, granted_by)

VALUES (p_user_id, v_role_id, p_granted_by)

ON DUPLICATE KEY UPDATE

granted_at = CURRENT_TIMESTAMP,

granted_by = p_granted_by;

-- 记录审计日志

INSERT INTO security_audit_log (

action_type,

user_id,

role_id,

performed_by,

action_description

)

VALUES (

'GRANT_ROLE',

p_user_id,

v_role_id,

p_granted_by,

CONCAT('Granted role ', p_role_name, ' to user ', p_user_id)

);

END $$

CREATE PROCEDURE revoke_user_role(

IN p_user_id INT,

IN p_role_name VARCHAR(50),

IN p_revoked_by VARCHAR(100)

)

BEGIN

DECLARE v_role_id INT;

-- 获取角色ID

SELECT role_id INTO v_role_id

FROM roles

WHERE role_name = p_role_name;

-- 撤销角色

DELETE FROM user_roles

WHERE user_id = p_user_id

AND role_id = v_role_id;

-- 记录审计日志

INSERT INTO security_audit_log (

action_type,

user_id,

role_id,

performed_by,

action_description

)

VALUES (

'REVOKE_ROLE',

p_user_id,

v_role_id,

p_revoked_by,

CONCAT('Revoked role ', p_role_name, ' from user ', p_user_id)

);

END $$

DELIMITER ;

-- 3. 创建安全视图

CREATE VIEW user_permissions AS

SELECT

u.user_id,

r.role_name,

p.permission_name,

p.resource_type,

p.access_level

FROM user_roles ur

JOIN roles r ON ur.role_id = r.role_id

JOIN role_permissions rp ON r.role_id = rp.role_id

JOIN permissions p ON rp.permission_id = p.permission_id

JOIN users u ON ur.user_id = u.id

WHERE (ur.expires_at IS NULL OR ur.expires_at > NOW());

-- 4. 创建权限检查函数

DELIMITER $$

CREATE FUNCTION check_user_permission(

p_user_id INT,

p_permission_name VARCHAR(100)

) RETURNS BOOLEAN

DETERMINISTIC

BEGIN

DECLARE has_permission BOOLEAN;

SELECT EXISTS (

SELECT 1

FROM user_permissions

WHERE user_id = p_user_id

AND permission_name = p_permission_name

) INTO has_permission;

RETURN has_permission;

END $$

DELIMITER ;

-- 5. 设置默认安全策略

-- 创建基本角色

INSERT INTO roles (role_name, description) VALUES

('admin', 'Full system access'),

('developer', 'Development and testing access'),

('analyst', 'Read-only access to analytics'),

('user', 'Basic user access');

-- 创建基本权限

INSERT INTO permissions (permission_name, resource_type, access_level) VALUES

('READ_USER_DATA', 'USER', 'READ'),

('WRITE_USER_DATA', 'USER', 'WRITE'),

('MANAGE_USERS', 'USER', 'ADMIN'),

('READ_ANALYTICS', 'ANALYTICS', 'READ'),

('GENERATE_REPORTS', 'REPORTS', 'WRITE');

-- 分配角色权限

INSERT INTO role_permissions (role_id, permission_id)

SELECT

r.role_id,

p.permission_id

FROM roles r

CROSS JOIN permissions p

WHERE r.role_name = 'admin';

-- 6. 审计日志

CREATE TABLE security_audit_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

action_type VARCHAR(50),

user_id INT,

role_id INT,

permission_id INT,

performed_by VARCHAR(100),

action_description TEXT,

action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

ip_address VARCHAR(45),

success BOOLEAN

);

2. 加密和脱敏代码语言:javascript代码运行次数:0运行复制-- 1. 创建加密函数

DELIMITER $$

CREATE FUNCTION encrypt_data(p_data TEXT, p_key VARCHAR(32))

RETURNS VARBINARY(1000)

DETERMINISTIC

BEGIN

RETURN AES_ENCRYPT(p_data, p_key);

END $$

CREATE FUNCTION decrypt_data(p_data VARBINARY(1000), p_key VARCHAR(32))

RETURNS TEXT

DETERMINISTIC

BEGIN

RETURN AES_DECRYPT(p_data, p_key);

END $$

-- 2. 创建脱敏函数

CREATE FUNCTION mask_email(p_email VARCHAR(255))

RETURNS VARCHAR(255)

DETERMINISTIC

BEGIN

DECLARE v_local_part VARCHAR(255);

DECLARE v_domain VARCHAR(255);

SET v_local_part = SUBSTRING_INDEX(p_email, '@', 1);

SET v_domain = SUBSTRING_INDEX(p_email, '@', -1);

RETURN CONCAT(

LEFT(v_local_part, 2),

REPEAT('*', LENGTH(v_local_part) - 2),

'@',

v_domain

);

END $$

CREATE FUNCTION mask_phone(p_phone VARCHAR(20))

RETURNS VARCHAR(20)

DETERMINISTIC

BEGIN

RETURN CONCAT(

LEFT(p_phone, 3),

REPEAT('*', LENGTH(p_phone) - 7),

RIGHT(p_phone, 4)

);

END $$

CREATE FUNCTION mask_card(p_card VARCHAR(20))

RETURNS VARCHAR(20)

DETERMINISTIC

BEGIN

RETURN CONCAT(

'XXXX-XXXX-XXXX-',

RIGHT(p_card, 4)

);

END $$

DELIMITER ;

-- 3. 创建敏感数据表

CREATE TABLE sensitive_data (

id INT PRIMARY KEY AUTO_INCREMENT,

data_type VARCHAR(50),

raw_data VARBINARY(1000),

encryption_key_id INT,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

INDEX idx_data_type (data_type)

);

-- 4. 创建加密密钥管理

CREATE TABLE encryption_keys (

id INT PRIMARY KEY AUTO_INCREMENT,

key_name VARCHAR(100),

key_value VARBINARY(1000),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

expires_at TIMESTAMP,

status ENUM('active', 'expired', 'revoked'),

INDEX idx_status (status)

);

-- 5. 创建数据访问控制

DELIMITER $$

CREATE PROCEDURE access_sensitive_data(

IN p_user_id INT,

IN p_data_type VARCHAR(50),

IN p_data_id INT

)

BEGIN

DECLARE v_has_permission BOOLEAN;

DECLARE v_encrypted_data VARBINARY(1000);

DECLARE v_key_value VARCHAR(32);

-- 检查权限

SELECT EXISTS (

SELECT 1 FROM user_permissions

WHERE user_id = p_user_id

AND permission_name = CONCAT('READ_', p_data_type)

) INTO v_has_permission;

IF v_has_permission THEN

-- 获取加密数据和密钥

SELECT

sd.raw_data,

ek.key_value

INTO

v_encrypted_data,

v_key_value

FROM sensitive_data sd

JOIN encryption_keys ek ON sd.encryption_key_id = ek.id

WHERE sd.id = p_data_id

AND sd.data_type = p_data_type

AND ek.status = 'active';

-- 解密并返回数据

SELECT decrypt_data(v_encrypted_data, v_key_value) as decrypted_data;

-- 记录访问日志

INSERT INTO data_access_log (

user_id,

data_type,

data_id,

access_time,

access_type

)

VALUES (

p_user_id,

p_data_type,

p_data_id,

NOW(),

'READ'

);

ELSE

SIGNAL SQLSTATE '45000'

SET MESSAGE_TEXT = 'Permission denied';

END IF;

END $$

DELIMITER ;

-- 6. 创建数据脱敏视图

CREATE VIEW masked_user_data AS

SELECT

id,

username,

mask_email(email) as email,

mask_phone(phone) as phone,

city,

country,

created_at

FROM users;

-- 7. 创建加密数据备份程序

DELIMITER $$

CREATE PROCEDURE backup_sensitive_data()

BEGIN

DECLARE v_backup_key VARCHAR(32);

-- 生成备份密钥

SET v_backup_key = SHA2(CONCAT(UUID(), RAND()), 256);

-- 创建加密备份

CREATE TABLE sensitive_data_backup AS

SELECT

id,

data_type,

encrypt_data(raw_data, v_backup_key) as encrypted_backup,

NOW() as backup_time

FROM sensitive_data;

-- 安全存储备份密钥

INSERT INTO encryption_keys (

key_name,

key_value,

expires_at,

status

)

VALUES (

CONCAT('BACKUP_KEY_', DATE_FORMAT(NOW(), '%Y%m%d')),

v_backup_key,

DATE_ADD(NOW(), INTERVAL 30 DAY),

'active'

);

END $$

DELIMITER ;

-- 8. 创建密钥轮换程序

DELIMITER $$

CREATE PROCEDURE rotate_encryption_keys()

BEGIN

DECLARE v_new_key VARCHAR(32);

DECLARE v_old_key_id INT;

START TRANSACTION;

-- 生成新密钥

SET v_new_key = SHA2(CONCAT(UUID(), RAND()), 256);

-- 保存新密钥

INSERT INTO encryption_keys (

key_name,

key_value,

status

)

VALUES (

CONCAT('KEY_', DATE_FORMAT(NOW(), '%Y%m%d')),

v_new_key,

'active'

);

SET v_old_key_id = (

SELECT id

FROM encryption_keys

WHERE status = 'active'

ORDER BY created_at DESC

LIMIT 1

);

-- 重新加密数据

UPDATE sensitive_data

SET raw_data = encrypt_data(

decrypt_data(raw_data, (

SELECT key_value

FROM encryption_keys

WHERE id = encryption_key_id

)),

v_new_key

),

encryption_key_id = LAST_INSERT_ID()

WHERE encryption_key_id = v_old_key_id;

-- 废除旧密钥

UPDATE encryption_keys

SET status = 'expired'

WHERE id = v_old_key_id;

COMMIT;

END $$

DELIMITER ;

3. 审计跟踪代码语言:javascript代码运行次数:0运行复制-- 1. 创建审计日志表

CREATE TABLE audit_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

action_type ENUM('INSERT', 'UPDATE', 'DELETE', 'SELECT', 'LOGIN', 'LOGOUT'),

table_name VARCHAR(100),

record_id BIGINT,

old_value JSON,

new_value JSON,

user_id INT,

application_user VARCHAR(100),

action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

client_ip VARCHAR(45),

user_agent VARCHAR(255)

);

-- 2. 创建审计触发器

DELIMITER $$

-- 为用户表创建审计触发器

CREATE TRIGGER users_audit_insert

AFTER INSERT ON users

FOR EACH ROW

BEGIN

INSERT INTO audit_log (

action_type,

table_name,

record_id,

new_value,

user_id,

application_user

)

VALUES (

'INSERT',

'users',

NEW.id,

JSON_OBJECT(

'username', NEW.username,

'email', NEW.email,

'status', NEW.status

),

@current_user_id,

CURRENT_USER()

);

END $$

CREATE TRIGGER users_audit_update

AFTER UPDATE ON users

FOR EACH ROW

BEGIN

INSERT INTO audit_log (

action_type,

table_name,

record_id,

old_value,

new_value,

user_id,

application_user

)

VALUES (

'UPDATE',

'users',

NEW.id,

JSON_OBJECT(

'username', OLD.username,

'email', OLD.email,

'status', OLD.status

),

JSON_OBJECT(

'username', NEW.username,

'email', NEW.email,

'status', NEW.status

),

@current_user_id,

CURRENT_USER()

);

END $$

CREATE TRIGGER users_audit_delete

AFTER DELETE ON users

FOR EACH ROW

BEGIN

INSERT INTO audit_log (

action_type,

table_name,

record_id,

old_value,

user_id,

application_user

)

VALUES (

'DELETE',

'users',

OLD.id,

JSON_OBJECT(

'username', OLD.username,

'email', OLD.email,

'status', OLD.status

),

@current_user_id,

CURRENT_USER()

);

END $$

DELIMITER ;

-- 3. 创建审计查询视图

CREATE VIEW audit_summary AS

SELECT

DATE(action_time) as audit_date,

action_type,

table_name,

COUNT(*) as action_count,

COUNT(DISTINCT user_id) as unique_users

FROM audit_log

GROUP BY DATE(action_time), action_type, table_name;

-- 4. 创建审计报告程序

DELIMITER $$

CREATE PROCEDURE generate_audit_report(

IN p_start_date DATE,

IN p_end_date DATE

)

BEGIN

-- 总体统计

SELECT

action_type,

COUNT(*) as total_actions,

COUNT(DISTINCT user_id) as unique_users,

MIN(action_time) as first_action,

MAX(action_time) as last_action

FROM audit_log

WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date

GROUP BY action_type;

-- 用户活动分析

SELECT

user_id,

application_user,

COUNT(*) as total_actions,

GROUP_CONCAT(DISTINCT action_type) as action_types,

COUNT(DISTINCT table_name) as affected_tables

FROM audit_log

WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date

GROUP BY user_id, application_user

ORDER BY total_actions DESC;

-- 可疑活动检测

SELECT

action_time,

action_type,

table_name,

user_id,

application_user,

client_ip

FROM audit_log

WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date

AND (

-- 大量删除操作

action_type = 'DELETE'

-- 非工作时间的活动

OR HOUR(action_time) NOT BETWEEN 9 AND 18

-- 异常IP访问

OR client_ip NOT LIKE '192.168.%'

);

END $$

-- 5. 创建审计清理程序

CREATE PROCEDURE clean_audit_logs(IN p_days_to_keep INT)

BEGIN

-- 将旧日志归档

INSERT INTO audit_log_archive

SELECT *

FROM audit_log

WHERE action_time < DATE_SUB(CURRENT_DATE, INTERVAL p_days_to_keep DAY);

-- 删除旧日志

DELETE FROM audit_log

WHERE action_time < DATE_SUB(CURRENT_DATE, INTERVAL p_days_to_keep DAY);

END $$

DELIMITER ;

-- 6. 创建审计警报程序

DELIMITER $$

CREATE PROCEDURE check_audit_alerts()

BEGIN

-- 检查可疑活动

INSERT INTO security_alerts (

alert_type,

description,

severity,

created_at

)

SELECT

'SUSPICIOUS_ACTIVITY',

CONCAT(

'Multiple delete operations (',

COUNT(*),

') by user ',

user_id,

' in the last hour'

),

'HIGH',

NOW()

FROM audit_log

WHERE action_type = 'DELETE'

AND action_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)

GROUP BY user_id

HAVING COUNT(*) > 100;

-- 检查异常登录

INSERT INTO security_alerts (

alert_type,

description,

severity,

created_at

)

SELECT

'ABNORMAL_LOGIN',

CONCAT(

'Multiple failed login attempts (',

COUNT(*),

') from IP ',

client_ip

),

'HIGH',

NOW()

FROM audit_log

WHERE action_type = 'LOGIN'

AND action_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)

AND new_value->>'$.success' = 'false'

GROUP BY client_ip

HAVING COUNT(*) > 5;

END $$

DELIMITER ;

最后给大家推荐一下GPT 4.0系统,一次性买了200多个Plus会员放在这个系统的池子里,无需梯子即可直连,费用还比官网便宜一半,包售后。更多介绍点击这里,每月仅需88元!

我是岳哥,欢迎关注,下期见~

最新发表
友情链接

Copyright © 2022 中国队世界杯_2014世界杯德国 - dyhdcw.com All Rights Reserved.