Python编码规范中,有哪些细节需要注意?

摘要:0 序 基于 AI 整理了一份全面的企业级 Python 编码规范。这份规范将基于 PEP 8、Google Python Style Guide 等业界主流标准,结合现代 Python 开发实践,涵盖代码风格、类型提示、文档规范、测试要求
0 序 基于 AI 整理了一份全面的企业级 Python 编码规范。这份规范将基于 PEP 8、Google Python Style Guide 等业界主流标准,结合现代 Python 开发实践,涵盖代码风格、类型提示、文档规范、测试要求等关键方面。 1 企业级 Python 编码规范 1. 代码风格基础 1.1 缩进与换行 强制要求: 使用 4 个空格 缩进,禁止使用 Tab 每行最大长度 100 字符(比 PEP 8 的 79 更宽松,适应现代显示器) 续行使用括号隐式连接,或使用反斜杠 \(仅在括号内不可行时) # ✅ 正确:使用括号隐式连接 result = some_function( very_long_argument_name_one, very_long_argument_name_two, very_long_argument_name_three, ) # ✅ 正确:条件表达式换行 if( user.is_active and user.has_permission('admin') and not user.is_suspended ): process_admin_action(user) # ❌ 错误:使用反斜杠(除非必要) result = some_function(very_long_argument_name_one, \ very_long_argument_name_two) 1.2 空行规范 # 模块级别:2 空行分隔顶级定义 import os import sys class DatabaseManager: """数据库管理器类""" # 类内方法:1 空行分隔 def __init__(self): self.connection = None def connect(self): """建立连接。""" pass # 相关方法组之间可加 1 空行 def query(self): """执行查询。""" pass # 函数之间:2 空行 def helper_function(): pass def another_helper(): pass 1.3 导入规范 # ✅ 正确:分组排序,每组之间空一行 # 1. 标准库 import os import sys from datetime import datetime from typing import Optional, List # 2. 第三方库 import pandas as pd import numpy as np from fastapi import FastAPI from sqlalchemy import Column, Integer # 3. 本地/项目导入 from myproject.models import User from myproject.utils import format_date # ❌ 错误:混合导入、未排序 import pandas as pd import os from myproject.models import User import sys 导入规则: 绝对导入优先于相对导入 避免 from module import * 类型提示-only 的导入使用 TYPE_CHECKING 保护: from typing import TYPE_CHECKING if TYPE_CHECKING: from myproject.models import User # 仅在类型检查时导入 2. 命名规范 类型 规范 示例 模块/包 小写,下划线分隔 data_processor.py, ml_utils 类 大驼峰 (CapWords) UserManager, HttpResponse 函数/方法 小写,下划线分隔 get_user_by_id, calculate_total 变量 小写,下划线分隔 user_count, is_active 常量 全大写,下划线分隔 MAX_RETRY_COUNT, DEFAULT_TIMEOUT 私有属性 单下划线前缀 _internal_cache, _validate_input() 强私有 双下划线前缀(避免使用) __very_private 魔术方法 双下划线前后缀 __init__, __str__ # ✅ 命名示例 class PaymentGateway: """支付网关类。""" TIMEOUT_SECONDS = 30 # 常量 _instance_count = 0 # 类级私有变量 def __init__(self, api_key: str): self._api_key = api_key # 实例私有变量 self.is_connected = False # 公共属性 def process_payment(self, amount: float) -> str: """处理支付请求。""" transaction_id = self._generate_transaction_id() return transaction_id def _generate_transaction_id(self) -> str: """内部方法:生成交易ID。""" pass 3. 类型提示(强制要求) 所有公共 API 必须添加类型提示,使用 Python 3.9+ 内置泛型类型。 from typing import Optional, Union, Callable, Any from collections.abc import Iterator, Sequence, Mapping # ✅ 函数签名完整类型提示 def fetch_users( user_ids: list[int], include_deleted: bool = False, timeout: float | None = None, ) -> dict[int, User]: """根据ID列表批量获取用户。 Args: user_ids: 用户ID列表 include_deleted: 是否包含已删除用户 timeout: 请求超时时间(秒) Returns: 用户ID到用户对象的映射字典 Raises: ValueError: 当 user_ids 为空时 TimeoutError: 当请求超时时 """ if not user_ids: raise ValueError("user_ids cannot be empty") # ... # ✅ 复杂类型使用 TypeAlias(Python 3.10+) from typing import TypeAlias UserMapping: TypeAlias = dict[int, User] ProcessorFunc: TypeAlias = Callable[[bytes], str] # ✅ 泛型类定义 class Repository[T]: """通用数据仓库类。""" def get(self, id: int) -> T | None: """根据ID获取实体。""" pass def list_all(self) -> list[T]: """获取所有实体列表。""" pass # ✅ Optional 使用 | None 语法(Python 3.10+) def find_user(email: str) -> User | None: pass 4. 文档字符串(Docstrings) 强制使用 Google Style 或 NumPy Style,统一团队选择。 Google Style 示例(推荐) def create_order( customer_id: int, items: list[OrderItem], coupon_code: str | None = None, ) -> Order: """创建新订单。 该方法会验证库存、计算价格、应用优惠券,并创建订单记录。 整个过程是原子性的,任何步骤失败都会回滚。 Args: customer_id: 客户唯一标识 items: 订单商品列表,每个商品包含 sku 和 quantity coupon_code: 可选的优惠券代码,用于折扣计算 Returns: 创建的订单对象,包含生成的订单号和最终价格 Raises: CustomerNotFoundError: 当 customer_id 不存在时 InsufficientInventoryError: 当某个商品库存不足时 InvalidCouponError: 当优惠券无效或已过期时 Example: >>> item = OrderItem(sku="SKU-001", quantity=2) >>> order = create_order(12345, [item], "SAVE20") >>> print(order.total_amount) 159.99 """ pass class PaymentProcessor: """处理支付请求的抽象基类。 实现了通用的支付流程,包括验证、风控检查、执行扣款和通知。 子类需要实现具体的支付渠道逻辑。 Attributes: merchant_id: 商户ID,用于标识交易来源 sandbox_mode: 是否为沙箱环境,沙箱模式下不执行真实扣款 _logger: 内部使用的日志记录器实例 """ def __init__(self, merchant_id: str, sandbox_mode: bool = False): """初始化处理器。 Args: merchant_id: 商户唯一标识符 sandbox_mode: 是否启用沙箱模式 """ self.merchant_id = merchant_id self.sandbox_mode = sandbox_mode self._logger = logging.getLogger(__name__) 5. 代码结构与设计 5.1 函数设计原则 # ✅ 单一职责,参数精简 def calculate_discounted_price( original_price: float, discount_rate: float, ) -> float: """计算折扣后价格。""" if not 0 <= discount_rate <= 1: raise ValueError("discount_rate must be between 0 and 1") return original_price * (1 - discount_rate) # ✅ 使用数据类封装相关参数(参数过多时) from dataclasses import dataclass @dataclass(frozen=True) class PricingContext: """定价上下文对象。""" base_price: float discount_rate: float = 0.0 tax_rate: float = 0.0 coupon_code: str | None = None def calculate_final_price(context: PricingContext) -> float: """根据上下文计算最终价格。""" price = context.base_price * (1 - context.discount_rate) price *= (1 + context.tax_rate) # 应用优惠券逻辑... return round(price, 2) # ❌ 避免:参数过多、职责混杂 def process_payment_v1( user_id: int, amount: float, currency: str, method: str, card_number: str, cvv: str, expiry: str, save_card: bool, send_email: bool, email_template: str, retry_count: int, ): pass 5.2 类设计原则 from abc import ABC, abstractmethod from typing import Protocol # ✅ 优先使用组合而非继承 class Notifier: """通知发送器。""" def __init__(self, channels: list[NotificationChannel]): self._channels = channels def send(self, message: str, priority: str = "normal") -> None: """向所有渠道发送通知。""" for channel in self._channels: if channel.is_available: channel.send(message, priority) # ✅ 使用 Protocol 定义接口(结构化子类型) class NotificationChannel(Protocol): """通知渠道协议。""" @property def is_available(self) -> bool: ... def send(self, message: str, priority: str) -> None: ... # ✅ 抽象基类用于共享实现 class BaseRepository[T](ABC): """通用仓库抽象基类。""" @abstractmethod def get(self, id: int) -> T | None: """根据ID获取实体。""" raise NotImplementedError @abstractmethod def save(self, entity: T) -> T: """保存实体。""" raise NotImplementedError def exists(self, id: int) -> bool: """检查实体是否存在(有默认实现)。""" return self.get(id) is not None 5.3 错误处理 # ✅ 自定义异常层次 class BusinessError(Exception): """业务逻辑错误基类。""" pass class ValidationError(BusinessError): """数据验证错误。""" def __init__(self, field: str, message: str): self.field = field self.message = message super().__init__(f"Validation failed for '{field}': {message}") class PaymentError(BusinessError): """支付处理错误。""" def __init__(self, transaction_id: str, reason: str): self.transaction_id = transaction_id self.reason = reason super().__init__(f"Payment {transaction_id} failed: {reason}") # ✅ 异常处理最佳实践 def process_user_registration(email: str, password: str) -> User: """处理用户注册。 Raises: ValidationError: 当输入数据验证失败时 DuplicateUserError: 当邮箱已被注册时 """ try: validate_email(email) validate_password_strength(password) except ValueError as e: # 转换底层异常为业务异常 raise ValidationError("email", str(e)) from e try: return user_repository.create(email, password) except IntegrityError as e: raise DuplicateUserError(email) from e # ✅ 使用上下文管理器确保资源释放 from contextlib import contextmanager @contextmanager def database_transaction(): """提供数据库事务上下文。""" conn = get_connection() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() # 使用示例 with database_transaction() as conn: conn.execute("INSERT INTO users ...") conn.execute("INSERT INTO profiles ...") 6. 异步编程规范 import asyncio from typing import AsyncIterator from collections.abc import AsyncGenerator # ✅ 异步函数命名加 async_ 前缀或使用语义明确的名称 async def async_fetch_data(url: str) -> bytes: """异步获取远程数据。""" async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.read() # ✅ 异步生成器 async def stream_large_file(path: str) -> AsyncGenerator[bytes, None]: """异步流式读取大文件。""" async with aiofiles.open(path, 'rb') as f: while chunk := await f.read(8192): yield chunk # ✅ 异步上下文管理器 class AsyncDatabaseConnection: """异步数据库连接上下文管理器。""" async def __aenter__(self) -> "AsyncDatabaseConnection": self._conn = await create_connection() return self async def __aexit__(self, exc_type, exc, tb) -> None: await self._conn.close() async def query(self, sql: str) -> list[dict]: """执行查询。""" return await self._conn.fetch(sql) # ✅ 并发控制 async def process_batch(items: list[Item]) -> list[Result]: """批量处理,限制并发数。""" semaphore = asyncio.Semaphore(10) # 最多10个并发 async def process_with_limit(item: Item) -> Result: async with semaphore: return await process_item(item) return await asyncio.gather(*[process_with_limit(item) for item in items]) 7. 测试规范 7.1 测试结构 # tests/test_user_service.py import pytest from unittest.mock import Mock, patch from myproject.services import UserService from myproject.models import User class TestUserService: """UserService 测试套件。""" @pytest.fixture def user_repo(self): """创建模拟的用户仓库。""" return Mock() @pytest.fixture def service(self, user_repo): """创建被测服务实例。""" return UserService(user_repo) def test_get_user_by_id_success(self, service, user_repo): """测试正常获取用户场景。""" # Arrange expected_user = User(id=1, email="test@example.com") user_repo.get_by_id.return_value = expected_user # Act result = service.get_user_by_id(1) # Assert assert result == expected_user user_repo.get_by_id.assert_called_once_with(1) def test_get_user_by_id_not_found(self, service, user_repo): """测试用户不存在场景。""" user_repo.get_by_id.return_value = None with pytest.raises(UserNotFoundError) as exc_info: service.get_user_by_id(999) assert exc_info.value.user_id == 999 # ✅ 参数化测试 @pytest.mark.parametrize("input,expected", [ ("user@example.com", True), ("invalid-email", False), ("", False), ("@example.com", False), ]) def test_email_validation(input: str, expected: bool): """测试邮箱格式验证。""" assert validate_email_format(input) is expected 7.2 测试覆盖要求 行覆盖率:≥ 80% 分支覆盖率:≥ 70% 关键业务逻辑:必须 100% 覆盖 8. 性能与优化 from functools import lru_cache, wraps from typing import Iterator import time # ✅ 缓存策略 class ProductService: """产品服务,带缓存优化。""" @lru_cache(maxsize=128) def get_category_tree(self, root_id: int) -> dict: """获取分类树(结果可缓存)。""" # 复杂查询... return tree def get_hot_products(self) -> list[Product]: """获取热门产品(使用实例级缓存,带TTL)。""" if self._is_cache_valid(): return self._product_cache products = self._fetch_from_db() self._update_cache(products) return products # ✅ 生成器处理大数据 def process_log_lines(file_path: str) -> Iterator[LogEntry]: """流式处理日志文件,避免内存爆炸。""" with open(file_path, 'r', encoding='utf-8') as f: for line in f: yield parse_log_line(line) # ✅ 性能监控装饰器 def timed(func: Callable) -> Callable: """记录函数执行时间的装饰器。""" @wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) elapsed = time.perf_counter() - start logger.info(f"{func.__name__} executed in {elapsed:.4f}s") return result return wrapper 9. 项目结构与工具配置 9.1 推荐项目结构 my_project/ ├── pyproject.toml # 项目配置(PEP 621) ├── README.md ├── CHANGELOG.md ├── src/ │ └── my_project/ # 源代码(使用 src 布局) │ ├── __init__.py │ ├── models/ # 数据模型层 │ ├── services/ # 业务逻辑层 │ ├── repositories/ # 数据访问层 │ ├── api/ # 接口层(FastAPI/Flask) │ ├── utils/ # 工具函数 │ └── config.py # 配置管理 ├── tests/ │ ├── unit/ # 单元测试 │ ├── integration/ # 集成测试 │ └── conftest.py # pytest 配置 ├── docs/ # 文档 ├── scripts/ # 脚本工具 └── docker/ 9.2 工具链配置(pyproject.toml) [project] name = "my-project" version = "1.0.0" requires-python = ">=3.11" dependencies = [ "fastapi>=0.100.0", "pydantic>=2.0", "sqlalchemy>=2.0", ] [project.optional-dependencies] dev = [ "pytest>=7.0", "pytest-asyncio", "pytest-cov", "black", "ruff", "mypy", "pre-commit", ] # Black 格式化配置 [tool.black] line-length = 100 target-version = ['py311'] include = '\.pyi?$' # Ruff 配置(替代 flake8 + isort) [tool.ruff] line-length = 100 target-version = "py311" select = [ "E", # pycodestyle errors "F", # Pyflakes "I", # isort "N", # pep8-naming "W", # pycodestyle warnings "UP", # pyupgrade "B", # flake8-bugbear "C4", # flake8-comprehensions "SIM", # flake8-simplify ] ignore = ["E501"] # 行长度由 Black 处理 [tool.ruff.pydocstyle] convention = "google" # MyPy 类型检查配置 [tool.mypy] python_version = "3.11" strict = true warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true disallow_incomplete_defs = true check_untyped_defs = true disallow_untyped_decorators = false no_implicit_optional = true warn_redundant_casts = true warn_unused_ignores = true warn_no_return = true warn_unreachable = true # Pytest 配置 [tool.pytest.ini_options] testpaths = ["tests"] python_files = ["test_*.py", "*_test.py"] python_classes = ["Test*"] python_functions = ["test_*"] addopts = "-v --cov=src --cov-report=term-missing --cov-report=html" 10. 安全规范 import secrets import hashlib from typing import Final # ✅ 敏感信息处理 class SecurityConfig: """安全配置类。""" # 使用 Final 标记常量 PASSWORD_MIN_LENGTH: Final[int] = 12 TOKEN_EXPIRY_HOURS: Final[int] = 24 @staticmethod def hash_password(password: str) -> str: """安全地哈希密码(使用系统提供的算法)。""" import bcrypt salt = bcrypt.gensalt(rounds=12) return bcrypt.hashpw(password.encode(), salt).decode() @staticmethod def generate_secure_token(length: int = 32) -> str: """生成加密安全随机令牌。""" return secrets.token_urlsafe(length) # ✅ 输入验证与转义 from markupsafe import escape def render_user_content(user_input: str) -> str: """安全渲染用户输入(防止 XSS)。""" # 转义 HTML 特殊字符 safe_content = escape(user_input) return f"<div class='user-content'>{safe_content}</div>" # ✅ SQL 注入防护(使用参数化查询) def get_user_by_email(email: str) -> User | None: """根据邮箱查询用户(安全方式)。""" # ❌ 永远不要:f"SELECT * FROM users WHERE email = '{email}'" # ✅ 正确方式: query = "SELECT * FROM users WHERE email = :email" result = db.execute(query, {"email": email}) return result.fetchone() 11. 代码审查清单 提交 PR 前请自检: 风格:通过 black 和 ruff 检查 类型:mypy 无错误 测试:新增代码有对应测试,且全部通过 文档:公共 API 有完整 docstring 性能:无明显的性能陷阱(N+1 查询、内存泄漏等) 安全:无硬编码密钥、SQL 注入风险、XSS 漏洞 兼容性:不破坏现有 API 契约 日志:关键操作有适当的日志记录 12. 演进原则 向后兼容:公共 API 变更需遵循 SemVer,废弃功能需标记 DeprecationWarning 渐进增强:新代码必须遵循本规范,旧代码重构时逐步更新 工具优先:优先使用自动化工具(Black、Ruff、MyPy)而非人工审查风格问题 实用主义:规范服务于代码质量,紧急情况下可技术债务标记(# TODO: TECH DEBT - 原因) X 参考文献