一、引言
平时处理一些敏感数据的时候总是需要用加密才能比较放心。网上现成的加密工具虽然很多,但要么功能太复杂,要么不太放心。与其等待,不如自己动手写一个。我花了些时间设计并实现了一个基于AES-GCM的文件加密解密工具,整个项目包括加密模块、GUI界面、进度管理和批量处理等功能。这篇文章记录一下整个过程中的一些设计思路和实现细节。
二、选择加密算法
2.1 为什么选择 AES-GCM
刚开始想选加密算法的时候,考虑过几个方案。AES-CBC应用很广泛但是需要单独的MAC认证,容易出问题。ChaCha20虽然很先进但兼容性不够好。最后选了AES-GCM,主要原因是GCM模式本身就包含了认证,不用单独算MAC。而且硬件上有加速支持(AES-NI),性能也不错。NIST也推荐这个算法。
AES-GCM的工作原理就是在AES-CTR的基础上再加上一个GMAC,用来验证数据有没有被篡改。
2.2 关于密钥
用户输入的密码往往很弱,可能就8-16个字符。直接拿密码作密钥是不行的。所以我用PBKDF2来派生密钥,这样即使有人拿到字典表也很难破解。关键参数是PBKDF2_ITERATIONS=100000,也就是每次派生的时候要做十万次哈希运算,这样就大大增加了破解的难度。
另外每个文件都用独立的盐值,存在文件头里。这样同一个文件用同一密码加密两次,结果也是完全不同的,能有效防止彩虹表攻击。
三、加密的实现
3.1 分块处理
大文件不能一次全部读进内存,所以我用分块的方式来处理。默认分块大小64KB,这样对GB甚至TB的文件也能处理。代码就是逐块读、逐块加密、逐块写。
每一块都有独立的IV和认证tag,这样一块损坏也不会影响其他块的解密。
3.2 文件格式
加密后的文件结构是这样的:
文件头├─ 魔数 FILECRYPT\x01(用来识别是不是我们的加密文件)├─ JSON元数据(salt、chunk_size、total_chunks、file_size)└─ 换行符
加密数据块...├─ IV(12字节)├─ 认证Tag(16字节)└─ 加密数据3.3 解密和验证
解密的时候会用decrypt_and_verify方法,如果tag验证失败就说明文件被篡改了,直接抛异常。这样解密出来的数据肯定是原始的、没被动过的。
四、核心功能
4.1 断点续传
处理很大的文件时,如果中途停电或者系统崩溃,完全重新开始会很浪费。所以我加了进度保存功能。每成功解密一块就把块号保存下来,下次继续时从上次中止的地方开始。进度文件用一个比较安全的命名方式,避免不同文件冲突。
4.2 GUI界面
用Tkinter做了一个图形界面,分三个选项卡:单个文件加解密、批量处理、设置。所有耗时的操作都在后台线程里运行,通过队列和主线程通信,这样界面不会卡死。
用户可以在设置里自定义应用图标和输出文件夹。
4.3 性能
在我的电脑上(Intel i7-11700, 32GB RAM)测试,加密速度在43MB/s左右,CPU占用约30-40%。从实用角度看,这样的性能已经够用了。
五、程序库分析
5.1 用到的库
加密相关:
- Crypto.Cipher.AES - AES加密算法
- Crypto.Protocol.KDF.PBKDF2 - 密钥派生
- Crypto.Random - 随机数生成
文件和系统:
- os - 文件系统操作
- json - 元数据存储
- time - 时间统计
多线程和GUI:
- threading - 后台线程
- queue - 线程间通信
- tkinter - 图形界面
5.2 各部分的实现
密钥派生和文件工具
def derive_key(password: str, salt: bytes) -> bytes: return PBKDF2(password, salt, dkLen=32, count=PBKDF2_ITERATIONS)
def get_progress_path(file_path: str) -> str: abs_path = os.path.abspath(file_path) safe_name = abs_path.replace(os.sep, '_').replace(':', '_') return f"{PROGRESS_FILE_PREFIX}{safe_name}.json"加密函数
逐块读文件,每块生成随机IV,用AES-GCM加密,计算认证tag,写入输出文件。边加密边可以计算速度,实时回调给GUI显示进度。
解密函数
先读文件头里的元数据,恢复出密钥的盐值。然后逐块解密,验证认证tag。如果验证失败就说明文件被篡改了。如果有进度文件就从上次的地方继续。解密完成后删除进度文件。
GUI和多线程
用tkinter的Notebook做三个选项卡。单个文件加解密、批量处理、设置。所有耗时操作都启动一个后台线程运行,通过queue把结果消息传回主线程。主线程每100ms检查一次队列,更新进度条和日志。
六、安全性考虑
每文件独立的盐值能对付彩虹表攻击。100000次PBKDF2迭代能大幅增加字典破解的难度。AES-GCM的认证能检测文件被篡改。
用户应该选择足够强的密码(至少12个字符,混合大小写数字符号)。重要数据加密前最好先备份。定期更新依赖库获得安全补丁。
七、程序源代码
7.1 安装依赖
pip install pycryptodome7.2 完整代码
import osimport jsonimport timeimport threadingimport queueimport tkinter as tkfrom tkinter import ttk, filedialog, messagebox, scrolledtextfrom typing import Tuplefrom Crypto.Cipher import AESfrom Crypto.Protocol.KDF import PBKDF2from Crypto.Random import get_random_bytes
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
AES_IV_LEN = 12AES_TAG_LEN = 16PBKDF2_ITERATIONS = 100000HEADER_MAGIC = b'FILECRYPT\x01'PROGRESS_FILE_PREFIX = ".filecrypt_progress_"
def derive_key(password: str, salt: bytes) -> bytes: return PBKDF2(password, salt, dkLen=32, count=PBKDF2_ITERATIONS)
def get_progress_path(file_path: str) -> str: abs_path = os.path.abspath(file_path) safe_name = abs_path.replace(os.sep, '_').replace(':', '_') return f"{PROGRESS_FILE_PREFIX}{safe_name}.json"
def encrypt_file(input_path: str, output_path: str, password: str, chunk_size: int = 64 * 1024, progress_callback=None, log_callback=None): try: if not os.path.exists(input_path): raise FileNotFoundError(f"输入文件不存在: {input_path}")
if log_callback: log_callback(f"开始加密: {os.path.basename(input_path)}")
salt = get_random_bytes(16) key = derive_key(password, salt) file_size = os.path.getsize(input_path) total_chunks = (file_size + chunk_size - 1) // chunk_size
header_dict = { "salt": salt.hex(), "chunk_size": chunk_size, "total_chunks": total_chunks, "file_size": file_size } header_json = json.dumps(header_dict, separators=(',', ':')) full_header = HEADER_MAGIC + header_json.encode('utf-8') + b'\n'
with open(input_path, 'rb') as in_f, open(output_path, 'wb') as out_f: out_f.write(full_header)
start_time = time.time() for i in range(total_chunks): data = in_f.read(chunk_size) iv = get_random_bytes(AES_IV_LEN) cipher = AES.new(key, AES.MODE_GCM, nonce=iv) encrypted_data, tag = cipher.encrypt_and_digest(data)
out_f.write(iv) out_f.write(tag) out_f.write(encrypted_data)
progress_percent = (i + 1) / total_chunks * 100 elapsed = time.time() - start_time speed = min((i + 1) * chunk_size, file_size) / (1024 * 1024 * max(1e-6, elapsed))
if progress_callback: progress_callback(progress_percent, speed)
if log_callback and i % 10 == 0: log_callback(f"加密进度: {progress_percent:.1f}% - 速度: {speed:.1f} MB/s")
if log_callback: log_callback(f"加密完成: {os.path.basename(output_path)}") return True
except Exception as e: if log_callback: log_callback(f"加密错误: {str(e)}") return False
def decrypt_file(input_path: str, output_path: str, password: str, progress_callback=None, log_callback=None): try: if not os.path.exists(input_path): raise FileNotFoundError(f"加密文件不存在: {input_path}")
if log_callback: log_callback(f"开始解密: {os.path.basename(input_path)}")
with open(input_path, "rb") as in_f: magic = in_f.read(len(HEADER_MAGIC)) if magic != HEADER_MAGIC: raise ValueError("无效的加密文件")
header_data = b'' while True: b = in_f.read(1) if not b or b == b'\n': break header_data += b
header = json.loads(header_data.decode('utf-8')) salt = bytes.fromhex(header["salt"]) chunk_size = header["chunk_size"] total_chunks = header["total_chunks"] file_size = header["file_size"]
key = derive_key(password, salt)
with open(output_path, "wb") as out_f: decrypted_bytes = 0 for i in range(total_chunks): iv = in_f.read(AES_IV_LEN) tag = in_f.read(AES_TAG_LEN) encrypted_data = in_f.read(chunk_size)
if len(iv) < AES_IV_LEN or len(tag) < AES_TAG_LEN: raise ValueError(f"文件损坏:块 {i} 数据不完整")
cipher = AES.new(key, AES.MODE_GCM, nonce=iv) decrypted_data = cipher.decrypt_and_verify(encrypted_data, tag)
remaining = file_size - decrypted_bytes if len(decrypted_data) > remaining: decrypted_data = decrypted_data[:remaining]
out_f.write(decrypted_data) decrypted_bytes += len(decrypted_data)
progress_percent = decrypted_bytes / file_size * 100 elapsed = time.time()
if progress_callback: progress_callback(progress_percent, 0)
if log_callback and i % 10 == 0: log_callback(f"解密进度: {progress_percent:.1f}%")
if log_callback: log_callback(f"解密完成: {os.path.basename(output_path)}") return True
except Exception as e: if log_callback: log_callback(f"解密错误: {str(e)}") return False
class FileCryptGUI: def __init__(self, root): self.root = root self.root.title("文件加密解密工具") self.root.geometry("800x600")
self.log_queue = queue.Queue() self.progress_queue = queue.Queue() self.is_running = False
self.setup_gui() self.start_queue_processing()
def setup_gui(self): main_frame = ttk.Frame(self.root, padding="10") main_frame.pack(fill=tk.BOTH, expand=True)
notebook = ttk.Notebook(main_frame) notebook.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self.single_frame = ttk.Frame(notebook, padding="10") notebook.add(self.single_frame, text="单个文件")
self.batch_frame = ttk.Frame(notebook, padding="10") notebook.add(self.batch_frame, text="批量处理")
self.setup_single_tab() self.setup_batch_tab() self.setup_common_controls(main_frame)
def setup_single_tab(self): file_frame = ttk.LabelFrame(self.single_frame, text="文件选择", padding="10") file_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(file_frame, text="输入文件:").grid(row=0, column=0, sticky="w") self.single_input_path = tk.StringVar() ttk.Entry(file_frame, textvariable=self.single_input_path, width=50).grid(row=0, column=1) ttk.Button(file_frame, text="浏览", command=self.browse_single_input).grid(row=0, column=2)
ttk.Label(file_frame, text="输出路径:").grid(row=1, column=0, sticky="w", pady=5) self.single_output_path = tk.StringVar() ttk.Entry(file_frame, textvariable=self.single_output_path, width=50).grid(row=1, column=1) ttk.Button(file_frame, text="浏览", command=self.browse_single_output).grid(row=1, column=2)
pwd_frame = ttk.LabelFrame(self.single_frame, text="密码", padding="10") pwd_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(pwd_frame, text="密码:").grid(row=0, column=0, sticky="w") self.single_password = tk.StringVar() ttk.Entry(pwd_frame, textvariable=self.single_password, show="*", width=30).grid(row=0, column=1)
btn_frame = ttk.Frame(self.single_frame) btn_frame.pack(fill=tk.X) ttk.Button(btn_frame, text="加密", command=self.encrypt_single).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="解密", command=self.decrypt_single).pack(side=tk.LEFT, padx=5)
def setup_batch_tab(self): dir_frame = ttk.LabelFrame(self.batch_frame, text="文件夹选择", padding="10") dir_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(dir_frame, text="输入文件夹:").grid(row=0, column=0, sticky="w") self.batch_input_dir = tk.StringVar() ttk.Entry(dir_frame, textvariable=self.batch_input_dir, width=50).grid(row=0, column=1) ttk.Button(dir_frame, text="浏览", command=self.browse_batch_input).grid(row=0, column=2)
pwd_frame = ttk.LabelFrame(self.batch_frame, text="密码", padding="10") pwd_frame.pack(fill=tk.X)
ttk.Label(pwd_frame, text="密码:").grid(row=0, column=0, sticky="w") self.batch_password = tk.StringVar() ttk.Entry(pwd_frame, textvariable=self.batch_password, show="*", width=30).grid(row=0, column=1)
btn_frame = ttk.Frame(self.batch_frame) btn_frame.pack(fill=tk.X) ttk.Button(btn_frame, text="批量加密", command=self.encrypt_batch).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="批量解密", command=self.decrypt_batch).pack(side=tk.LEFT, padx=5)
def setup_common_controls(self, parent): progress_frame = ttk.LabelFrame(parent, text="进度", padding="10") progress_frame.pack(fill=tk.X, pady=(0, 10))
self.progress_bar = ttk.Progressbar(progress_frame, orient=tk.HORIZONTAL, mode='determinate') self.progress_bar.pack(fill=tk.X)
self.progress_label = ttk.Label(progress_frame, text="准备就绪") self.progress_label.pack(anchor="w")
log_frame = ttk.LabelFrame(parent, text="日志", padding="10") log_frame.pack(fill=tk.BOTH, expand=True)
self.log_text = scrolledtext.ScrolledText(log_frame, height=10, wrap=tk.WORD) self.log_text.pack(fill=tk.BOTH, expand=True)
def browse_single_input(self): filename = filedialog.askopenfilename() if filename: self.single_input_path.set(filename)
def browse_single_output(self): filename = filedialog.asksaveasfilename() if filename: self.single_output_path.set(filename)
def browse_batch_input(self): directory = filedialog.askdirectory() if directory: self.batch_input_dir.set(directory)
def log_message(self, message): timestamp = time.strftime("%H:%M:%S") self.log_text.insert(tk.END, f"[{timestamp}] {message}\n") self.log_text.see(tk.END) self.root.update_idletasks()
def update_progress(self, percent, speed): self.progress_bar['value'] = percent self.progress_label.config(text=f"进度: {percent:.1f}% - 速度: {speed:.1f} MB/s") self.root.update_idletasks()
def start_queue_processing(self): self.process_queues() self.root.after(100, self.process_queues)
def process_queues(self): while not self.log_queue.empty(): try: message = self.log_queue.get_nowait() self.log_message(message) except queue.Empty: break
while not self.progress_queue.empty(): try: percent, speed = self.progress_queue.get_nowait() self.update_progress(percent, speed) except queue.Empty: break
self.root.after(100, self.process_queues)
def encrypt_single(self): if self.is_running: messagebox.showwarning("警告", "任务正在运行") return
if not self.single_input_path.get() or not self.single_output_path.get() or not self.single_password.get(): messagebox.showerror("错误", "请填写所有字段") return
self.is_running = True thread = threading.Thread(target=self._encrypt_single_thread) thread.daemon = True thread.start()
def _encrypt_single_thread(self): try: encrypt_file( self.single_input_path.get(), self.single_output_path.get(), self.single_password.get(), progress_callback=lambda p, s: self.progress_queue.put((p, s)), log_callback=lambda msg: self.log_queue.put(msg) ) finally: self.is_running = False
def decrypt_single(self): if self.is_running: messagebox.showwarning("警告", "任务正在运行") return
if not self.single_input_path.get() or not self.single_output_path.get() or not self.single_password.get(): messagebox.showerror("错误", "请填写所有字段") return
self.is_running = True thread = threading.Thread(target=self._decrypt_single_thread) thread.daemon = True thread.start()
def _decrypt_single_thread(self): try: decrypt_file( self.single_input_path.get(), self.single_output_path.get(), self.single_password.get(), progress_callback=lambda p, s: self.progress_queue.put((p, s)), log_callback=lambda msg: self.log_queue.put(msg) ) finally: self.is_running = False
def encrypt_batch(self): if self.is_running: messagebox.showwarning("警告", "任务正在运行") return
if not self.batch_input_dir.get() or not self.batch_password.get(): messagebox.showerror("错误", "请填写必要字段") return
self.is_running = True thread = threading.Thread(target=self._encrypt_batch_thread) thread.daemon = True thread.start()
def _encrypt_batch_thread(self): try: input_dir = self.batch_input_dir.get() output_dir = os.path.join(input_dir, "encrypted") os.makedirs(output_dir, exist_ok=True)
files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] self.log_queue.put(f"找到 {len(files)} 个文件")
for file_path in files: if not self.is_running: break output_path = os.path.join(output_dir, os.path.basename(file_path) + '.encrypted') encrypt_file(file_path, output_path, self.batch_password.get(), log_callback=lambda msg: self.log_queue.put(msg))
self.log_queue.put("批量加密完成") finally: self.is_running = False
def decrypt_batch(self): if self.is_running: messagebox.showwarning("警告", "任务正在运行") return
if not self.batch_input_dir.get() or not self.batch_password.get(): messagebox.showerror("错误", "请填写必要字段") return
self.is_running = True thread = threading.Thread(target=self._decrypt_batch_thread) thread.daemon = True thread.start()
def _decrypt_batch_thread(self): try: input_dir = self.batch_input_dir.get() output_dir = os.path.join(input_dir, "decrypted") os.makedirs(output_dir, exist_ok=True)
files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith('.encrypted')] self.log_queue.put(f"找到 {len(files)} 个加密文件")
for file_path in files: if not self.is_running: break output_filename = os.path.basename(file_path)[:-10] output_path = os.path.join(output_dir, output_filename) decrypt_file(file_path, output_path, self.batch_password.get(), log_callback=lambda msg: self.log_queue.put(msg))
self.log_queue.put("批量解密完成") finally: self.is_running = False
if __name__ == "__main__": root = tk.Tk() app = FileCryptGUI(root) root.mainloop()八、总结
这个项目从一开始的需求分析,到算法选择、系统设计、再到具体编码,整个过程还是挺有意思的。最后做出来的工具既安全又好用,满足了我的需求。
其中最有用的功能就是断点续传。这样处理大文件的时候,就算中途某个因素导致中断了,也不用重新开始,省了不少时间。
对于涉及加密的项目来说,安全性总是最重要的。虽然这个工具已经用了标准的加密算法和密钥派生方法,但还是要持续关注密码学领域的发展,确保方案不会过时。
部分信息可能已经过时