Python文件操作与异常处理
一、文件基础操作
- 文件打开与关闭
使用open()
函数打开文件,完成后必须关闭:
1 2 3 4 5 6 7 8 9 10 11
| file = open('example.txt', 'r') try: content = file.read() finally: file.close()
with open('example.txt', 'r') as file: content = file.read()
|
- 文件打开模式
模式 |
描述 |
‘r’ |
只读(默认) |
‘w’ |
写入(覆盖) |
‘x’ |
独占创建(文件存在则失败) |
‘a’ |
追加 |
‘b’ |
二进制模式 |
‘t’ |
文本模式(默认) |
‘+’ |
读写模式 |
组合示例:'rb'
(二进制只读),'w+'
(读写,覆盖)
- 文件读写方法
读取操作
1 2 3 4 5 6 7 8 9 10 11 12 13
| with open('example.txt', 'r') as f: content = f.read() line = f.readline() lines = f.readlines() for line in f: print(line.strip())
|
写入操作
1 2 3 4 5 6 7 8 9
| with open('output.txt', 'w') as f: f.write("Hello, World!\n") lines = ["Line 1\n", "Line 2\n", "Line 3\n"] f.writelines(lines)
|
- 文件指针操作
1 2 3 4 5 6 7 8 9 10 11
| with open('example.txt', 'rb+') as f: pos = f.tell() f.seek(10) f.seek(5, 1) f.seek(-3, 2) chunk = f.read(20)
|
二、目录与文件系统操作
- os模块基础
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import os
cwd = os.getcwd()
os.chdir('/path/to/directory')
files = os.listdir('.')
os.mkdir('new_dir') os.makedirs('path/to/new_dir', exist_ok=True) os.rmdir('empty_dir')
|
- pathlib模块(Python 3.4+)
更面向对象的路径操作方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| from pathlib import Path
p = Path('example.txt')
print(p.exists()) print(p.is_file()) print(p.is_dir()) print(p.suffix) print(p.stem)
content = p.read_text() p.write_text('New content')
new_p = p.parent / 'subdir' / 'newfile.txt'
|
- 文件信息与权限
1 2 3 4 5 6 7 8 9 10 11 12 13
| import os import stat import time
file_stat = os.stat('example.txt')
print(f"大小: {file_stat.st_size} 字节") print(f"最后修改时间: {time.ctime(file_stat.st_mtime)}") print(f"权限: {oct(file_stat.st_mode)[-3:]}")
os.chmod('example.txt', stat.S_IRUSR | stat.S_IWUSR)
|
- 文件遍历与搜索
1 2 3 4 5 6 7 8 9
| from pathlib import Path
for file in Path('.').rglob('*.py'): print(file)
for file in Path('.').glob('**/*.txt'): print(file)
|
三、异常处理机制
- 基本异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| try: result = 10 / 0 except ZeroDivisionError: print("不能除以零!") except (TypeError, ValueError) as e: print(f"类型或值错误: {e}") except Exception as e: print(f"未知错误: {e}") else: print("计算成功") finally: print("清理资源")
|
- 常见内置异常
异常 |
描述 |
FileNotFoundError |
文件不存在 |
PermissionError |
权限不足 |
IsADirectoryError |
预期文件但得到目录 |
NotADirectoryError |
预期目录但得到文件 |
IOError |
输入输出错误 |
OSError |
操作系统相关错误 |
ValueError |
值无效 |
TypeError |
类型错误 |
KeyError |
字典键不存在 |
IndexError |
序列索引超出范围 |
- 自定义异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class InvalidDataError(Exception): """自定义异常类""" def __init__(self, message, data): super().__init__(message) self.data = data
def process_data(data): if not data: raise InvalidDataError("数据不能为空", data)
try: process_data(None) except InvalidDataError as e: print(f"错误: {e}, 数据: {e.data}")
|
- 异常链
1 2 3 4 5
| try: open("nonexistent.txt") except FileNotFoundError as e: raise RuntimeError("处理文件时出错") from e
|
四、文件操作中的异常处理
- 安全的文件操作模式
1 2 3 4 5 6 7 8 9 10 11 12 13
| def safe_read_file(filename): try: with open(filename, 'r') as f: return f.read() except FileNotFoundError: print(f"文件 {filename} 不存在") return None except PermissionError: print(f"没有权限读取文件 {filename}") return None except UnicodeDecodeError: print(f"文件 {filename} 编码错误") return None
|
- 原子写入模式
确保写入操作要么完全成功,要么完全不修改文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import os from tempfile import NamedTemporaryFile
def atomic_write(filename, data): with NamedTemporaryFile('w', dir=os.path.dirname(filename), delete=False) as f: tempname = f.name try: f.write(data) f.flush() os.fsync(f.fileno()) os.replace(tempname, filename) except: os.unlink(tempname) raise
|
- 文件锁(跨进程安全)
1 2 3 4 5 6 7 8 9
| import fcntl
def locked_write(filename, data): with open(filename, 'a') as f: try: fcntl.flock(f, fcntl.LOCK_EX) f.write(data) finally: fcntl.flock(f, fcntl.LOCK_UN)
|
五、高级文件处理技术
- 内存映射文件
处理大文件的高效方式:
1 2 3 4 5 6 7 8 9 10 11 12
| import mmap
with open('large_file.bin', 'r+b') as f: mm = mmap.mmap(f.fileno(), 0) print(mm[:10]) mm[10:20] = b'x' * 10 mm.close()
|
- 临时文件与目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from tempfile import TemporaryFile, NamedTemporaryFile, TemporaryDirectory
with TemporaryFile('w+t') as f: f.write('临时内容') f.seek(0) print(f.read())
with NamedTemporaryFile('w+t', suffix='.tmp', delete=False) as f: print(f"临时文件路径: {f.name}") f.write('具名临时内容')
with TemporaryDirectory() as tmpdir: print(f"临时目录: {tmpdir}")
|
- 压缩文件处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import gzip import zipfile
with gzip.open('example.txt.gz', 'wt') as f: f.write('压缩内容')
with zipfile.ZipFile('archive.zip', 'w') as zf: zf.write('file1.txt') zf.write('file2.txt')
with zipfile.ZipFile('archive.zip', 'r') as zf: zf.extractall('extracted_files')
|
六、实际应用案例
- 配置文件解析器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import configparser
def load_config(config_path): config = configparser.ConfigParser() try: if not os.path.exists(config_path): raise FileNotFoundError(f"配置文件 {config_path} 不存在") config.read(config_path) return { 'database': { 'host': config.get('database', 'host'), 'port': config.getint('database', 'port'), 'user': config.get('database', 'user', fallback='admin') } } except (configparser.Error, ValueError) as e: raise RuntimeError(f"配置文件格式错误: {e}") from e
|
- 日志文件轮转
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import logging from logging.handlers import RotatingFileHandler
def setup_logging(log_file='app.log', max_size=10*1024*1024, backup_count=5): logger = logging.getLogger() logger.setLevel(logging.INFO) handler = RotatingFileHandler( log_file, maxBytes=max_size, backupCount=backup_count ) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler)
|
- CSV文件安全处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import csv from pathlib import Path
def read_csv_safely(file_path): try: file_path = Path(file_path) if file_path.suffix.lower() != '.csv': raise ValueError("文件必须是CSV格式") with open(file_path, 'r', newline='', encoding='utf-8') as f: sample = f.read(1024) if any(char in sample for char in ['=', '+', '-', '@']): raise ValueError("CSV文件可能包含危险内容") f.seek(0) reader = csv.DictReader(f) return [row for row in reader] except UnicodeDecodeError: raise ValueError("文件编码不支持") from None except csv.Error as e: raise ValueError(f"CSV格式错误: {e}") from None
|