Python编码规范中,有哪些细节需要注意?
摘要:0 序 基于 AI 整理了一份全面的企业级 Python 编码规范。这份规范将基于 PEP 8、Google Python Style Guide 等业界主流标准,结合现代 Python 开发实践,涵盖代码风格、类型提示、文档规范、测试要求
0 序
基于 AI 整理了一份全面的企业级 Python 编码规范。这份规范将基于 PEP 8、Google Python Style Guide 等业界主流标准,结合现代 Python 开发实践,涵盖代码风格、类型提示、文档规范、测试要求等关键方面。
1 企业级 Python 编码规范
1. 代码风格基础
1.1 缩进与换行
强制要求:
使用 4 个空格 缩进,禁止使用 Tab
每行最大长度 100 字符(比 PEP 8 的 79 更宽松,适应现代显示器)
续行使用括号隐式连接,或使用反斜杠 \(仅在括号内不可行时)
# ✅ 正确:使用括号隐式连接
result = some_function(
very_long_argument_name_one,
very_long_argument_name_two,
very_long_argument_name_three,
)
# ✅ 正确:条件表达式换行
if( user.is_active
and user.has_permission('admin')
and not user.is_suspended ):
process_admin_action(user)
# ❌ 错误:使用反斜杠(除非必要)
result = some_function(very_long_argument_name_one, \
very_long_argument_name_two)
1.2 空行规范
# 模块级别:2 空行分隔顶级定义
import os
import sys
class DatabaseManager:
"""数据库管理器类"""
# 类内方法:1 空行分隔
def __init__(self):
self.connection = None
def connect(self):
"""建立连接。"""
pass
# 相关方法组之间可加 1 空行
def query(self):
"""执行查询。"""
pass
# 函数之间:2 空行
def helper_function():
pass
def another_helper():
pass
1.3 导入规范
# ✅ 正确:分组排序,每组之间空一行
# 1. 标准库
import os
import sys
from datetime import datetime
from typing import Optional, List
# 2. 第三方库
import pandas as pd
import numpy as np
from fastapi import FastAPI
from sqlalchemy import Column, Integer
# 3. 本地/项目导入
from myproject.models import User
from myproject.utils import format_date
# ❌ 错误:混合导入、未排序
import pandas as pd
import os
from myproject.models import User
import sys
导入规则:
绝对导入优先于相对导入
避免 from module import *
类型提示-only 的导入使用 TYPE_CHECKING 保护:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from myproject.models import User # 仅在类型检查时导入
2. 命名规范
类型
规范
示例
模块/包
小写,下划线分隔
data_processor.py, ml_utils
类
大驼峰 (CapWords)
UserManager, HttpResponse
函数/方法
小写,下划线分隔
get_user_by_id, calculate_total
变量
小写,下划线分隔
user_count, is_active
常量
全大写,下划线分隔
MAX_RETRY_COUNT, DEFAULT_TIMEOUT
私有属性
单下划线前缀
_internal_cache, _validate_input()
强私有
双下划线前缀(避免使用)
__very_private
魔术方法
双下划线前后缀
__init__, __str__
# ✅ 命名示例
class PaymentGateway:
"""支付网关类。"""
TIMEOUT_SECONDS = 30 # 常量
_instance_count = 0 # 类级私有变量
def __init__(self, api_key: str):
self._api_key = api_key # 实例私有变量
self.is_connected = False # 公共属性
def process_payment(self, amount: float) -> str:
"""处理支付请求。"""
transaction_id = self._generate_transaction_id()
return transaction_id
def _generate_transaction_id(self) -> str:
"""内部方法:生成交易ID。"""
pass
3. 类型提示(强制要求)
所有公共 API 必须添加类型提示,使用 Python 3.9+ 内置泛型类型。
from typing import Optional, Union, Callable, Any
from collections.abc import Iterator, Sequence, Mapping
# ✅ 函数签名完整类型提示
def fetch_users(
user_ids: list[int],
include_deleted: bool = False,
timeout: float | None = None,
) -> dict[int, User]:
"""根据ID列表批量获取用户。
Args:
user_ids: 用户ID列表
include_deleted: 是否包含已删除用户
timeout: 请求超时时间(秒)
Returns:
用户ID到用户对象的映射字典
Raises:
ValueError: 当 user_ids 为空时
TimeoutError: 当请求超时时
"""
if not user_ids:
raise ValueError("user_ids cannot be empty")
# ...
# ✅ 复杂类型使用 TypeAlias(Python 3.10+)
from typing import TypeAlias
UserMapping: TypeAlias = dict[int, User]
ProcessorFunc: TypeAlias = Callable[[bytes], str]
# ✅ 泛型类定义
class Repository[T]:
"""通用数据仓库类。"""
def get(self, id: int) -> T | None:
"""根据ID获取实体。"""
pass
def list_all(self) -> list[T]:
"""获取所有实体列表。"""
pass
# ✅ Optional 使用 | None 语法(Python 3.10+)
def find_user(email: str) -> User | None:
pass
4. 文档字符串(Docstrings)
强制使用 Google Style 或 NumPy Style,统一团队选择。
Google Style 示例(推荐)
def create_order(
customer_id: int,
items: list[OrderItem],
coupon_code: str | None = None,
) -> Order:
"""创建新订单。
该方法会验证库存、计算价格、应用优惠券,并创建订单记录。
整个过程是原子性的,任何步骤失败都会回滚。
Args:
customer_id: 客户唯一标识
items: 订单商品列表,每个商品包含 sku 和 quantity
coupon_code: 可选的优惠券代码,用于折扣计算
Returns:
创建的订单对象,包含生成的订单号和最终价格
Raises:
CustomerNotFoundError: 当 customer_id 不存在时
InsufficientInventoryError: 当某个商品库存不足时
InvalidCouponError: 当优惠券无效或已过期时
Example:
>>> item = OrderItem(sku="SKU-001", quantity=2)
>>> order = create_order(12345, [item], "SAVE20")
>>> print(order.total_amount)
159.99
"""
pass
class PaymentProcessor:
"""处理支付请求的抽象基类。
实现了通用的支付流程,包括验证、风控检查、执行扣款和通知。
子类需要实现具体的支付渠道逻辑。
Attributes:
merchant_id: 商户ID,用于标识交易来源
sandbox_mode: 是否为沙箱环境,沙箱模式下不执行真实扣款
_logger: 内部使用的日志记录器实例
"""
def __init__(self, merchant_id: str, sandbox_mode: bool = False):
"""初始化处理器。
Args:
merchant_id: 商户唯一标识符
sandbox_mode: 是否启用沙箱模式
"""
self.merchant_id = merchant_id
self.sandbox_mode = sandbox_mode
self._logger = logging.getLogger(__name__)
5. 代码结构与设计
5.1 函数设计原则
# ✅ 单一职责,参数精简
def calculate_discounted_price(
original_price: float,
discount_rate: float,
) -> float:
"""计算折扣后价格。"""
if not 0 <= discount_rate <= 1:
raise ValueError("discount_rate must be between 0 and 1")
return original_price * (1 - discount_rate)
# ✅ 使用数据类封装相关参数(参数过多时)
from dataclasses import dataclass
@dataclass(frozen=True)
class PricingContext:
"""定价上下文对象。"""
base_price: float
discount_rate: float = 0.0
tax_rate: float = 0.0
coupon_code: str | None = None
def calculate_final_price(context: PricingContext) -> float:
"""根据上下文计算最终价格。"""
price = context.base_price * (1 - context.discount_rate)
price *= (1 + context.tax_rate)
# 应用优惠券逻辑...
return round(price, 2)
# ❌ 避免:参数过多、职责混杂
def process_payment_v1(
user_id: int,
amount: float,
currency: str,
method: str,
card_number: str,
cvv: str,
expiry: str,
save_card: bool,
send_email: bool,
email_template: str,
retry_count: int,
):
pass
5.2 类设计原则
from abc import ABC, abstractmethod
from typing import Protocol
# ✅ 优先使用组合而非继承
class Notifier:
"""通知发送器。"""
def __init__(self, channels: list[NotificationChannel]):
self._channels = channels
def send(self, message: str, priority: str = "normal") -> None:
"""向所有渠道发送通知。"""
for channel in self._channels:
if channel.is_available:
channel.send(message, priority)
# ✅ 使用 Protocol 定义接口(结构化子类型)
class NotificationChannel(Protocol):
"""通知渠道协议。"""
@property
def is_available(self) -> bool:
...
def send(self, message: str, priority: str) -> None:
...
# ✅ 抽象基类用于共享实现
class BaseRepository[T](ABC):
"""通用仓库抽象基类。"""
@abstractmethod
def get(self, id: int) -> T | None:
"""根据ID获取实体。"""
raise NotImplementedError
@abstractmethod
def save(self, entity: T) -> T:
"""保存实体。"""
raise NotImplementedError
def exists(self, id: int) -> bool:
"""检查实体是否存在(有默认实现)。"""
return self.get(id) is not None
5.3 错误处理
# ✅ 自定义异常层次
class BusinessError(Exception):
"""业务逻辑错误基类。"""
pass
class ValidationError(BusinessError):
"""数据验证错误。"""
def __init__(self, field: str, message: str):
self.field = field
self.message = message
super().__init__(f"Validation failed for '{field}': {message}")
class PaymentError(BusinessError):
"""支付处理错误。"""
def __init__(self, transaction_id: str, reason: str):
self.transaction_id = transaction_id
self.reason = reason
super().__init__(f"Payment {transaction_id} failed: {reason}")
# ✅ 异常处理最佳实践
def process_user_registration(email: str, password: str) -> User:
"""处理用户注册。
Raises:
ValidationError: 当输入数据验证失败时
DuplicateUserError: 当邮箱已被注册时
"""
try:
validate_email(email)
validate_password_strength(password)
except ValueError as e:
# 转换底层异常为业务异常
raise ValidationError("email", str(e)) from e
try:
return user_repository.create(email, password)
except IntegrityError as e:
raise DuplicateUserError(email) from e
# ✅ 使用上下文管理器确保资源释放
from contextlib import contextmanager
@contextmanager
def database_transaction():
"""提供数据库事务上下文。"""
conn = get_connection()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# 使用示例
with database_transaction() as conn:
conn.execute("INSERT INTO users ...")
conn.execute("INSERT INTO profiles ...")
6. 异步编程规范
import asyncio
from typing import AsyncIterator
from collections.abc import AsyncGenerator
# ✅ 异步函数命名加 async_ 前缀或使用语义明确的名称
async def async_fetch_data(url: str) -> bytes:
"""异步获取远程数据。"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
# ✅ 异步生成器
async def stream_large_file(path: str) -> AsyncGenerator[bytes, None]:
"""异步流式读取大文件。"""
async with aiofiles.open(path, 'rb') as f:
while chunk := await f.read(8192):
yield chunk
# ✅ 异步上下文管理器
class AsyncDatabaseConnection:
"""异步数据库连接上下文管理器。"""
async def __aenter__(self) -> "AsyncDatabaseConnection":
self._conn = await create_connection()
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self._conn.close()
async def query(self, sql: str) -> list[dict]:
"""执行查询。"""
return await self._conn.fetch(sql)
# ✅ 并发控制
async def process_batch(items: list[Item]) -> list[Result]:
"""批量处理,限制并发数。"""
semaphore = asyncio.Semaphore(10) # 最多10个并发
async def process_with_limit(item: Item) -> Result:
async with semaphore:
return await process_item(item)
return await asyncio.gather(*[process_with_limit(item) for item in items])
7. 测试规范
7.1 测试结构
# tests/test_user_service.py
import pytest
from unittest.mock import Mock, patch
from myproject.services import UserService
from myproject.models import User
class TestUserService:
"""UserService 测试套件。"""
@pytest.fixture
def user_repo(self):
"""创建模拟的用户仓库。"""
return Mock()
@pytest.fixture
def service(self, user_repo):
"""创建被测服务实例。"""
return UserService(user_repo)
def test_get_user_by_id_success(self, service, user_repo):
"""测试正常获取用户场景。"""
# Arrange
expected_user = User(id=1, email="test@example.com")
user_repo.get_by_id.return_value = expected_user
# Act
result = service.get_user_by_id(1)
# Assert
assert result == expected_user
user_repo.get_by_id.assert_called_once_with(1)
def test_get_user_by_id_not_found(self, service, user_repo):
"""测试用户不存在场景。"""
user_repo.get_by_id.return_value = None
with pytest.raises(UserNotFoundError) as exc_info:
service.get_user_by_id(999)
assert exc_info.value.user_id == 999
# ✅ 参数化测试
@pytest.mark.parametrize("input,expected", [
("user@example.com", True),
("invalid-email", False),
("", False),
("@example.com", False),
])
def test_email_validation(input: str, expected: bool):
"""测试邮箱格式验证。"""
assert validate_email_format(input) is expected
7.2 测试覆盖要求
行覆盖率:≥ 80%
分支覆盖率:≥ 70%
关键业务逻辑:必须 100% 覆盖
8. 性能与优化
from functools import lru_cache, wraps
from typing import Iterator
import time
# ✅ 缓存策略
class ProductService:
"""产品服务,带缓存优化。"""
@lru_cache(maxsize=128)
def get_category_tree(self, root_id: int) -> dict:
"""获取分类树(结果可缓存)。"""
# 复杂查询...
return tree
def get_hot_products(self) -> list[Product]:
"""获取热门产品(使用实例级缓存,带TTL)。"""
if self._is_cache_valid():
return self._product_cache
products = self._fetch_from_db()
self._update_cache(products)
return products
# ✅ 生成器处理大数据
def process_log_lines(file_path: str) -> Iterator[LogEntry]:
"""流式处理日志文件,避免内存爆炸。"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
yield parse_log_line(line)
# ✅ 性能监控装饰器
def timed(func: Callable) -> Callable:
"""记录函数执行时间的装饰器。"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(f"{func.__name__} executed in {elapsed:.4f}s")
return result
return wrapper
9. 项目结构与工具配置
9.1 推荐项目结构
my_project/
├── pyproject.toml # 项目配置(PEP 621)
├── README.md
├── CHANGELOG.md
├── src/
│ └── my_project/ # 源代码(使用 src 布局)
│ ├── __init__.py
│ ├── models/ # 数据模型层
│ ├── services/ # 业务逻辑层
│ ├── repositories/ # 数据访问层
│ ├── api/ # 接口层(FastAPI/Flask)
│ ├── utils/ # 工具函数
│ └── config.py # 配置管理
├── tests/
│ ├── unit/ # 单元测试
│ ├── integration/ # 集成测试
│ └── conftest.py # pytest 配置
├── docs/ # 文档
├── scripts/ # 脚本工具
└── docker/
9.2 工具链配置(pyproject.toml)
[project]
name = "my-project"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.100.0",
"pydantic>=2.0",
"sqlalchemy>=2.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-asyncio",
"pytest-cov",
"black",
"ruff",
"mypy",
"pre-commit",
]
# Black 格式化配置
[tool.black]
line-length = 100
target-version = ['py311']
include = '\.pyi?$'
# Ruff 配置(替代 flake8 + isort)
[tool.ruff]
line-length = 100
target-version = "py311"
select = [
"E", # pycodestyle errors
"F", # Pyflakes
"I", # isort
"N", # pep8-naming
"W", # pycodestyle warnings
"UP", # pyupgrade
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"SIM", # flake8-simplify
]
ignore = ["E501"] # 行长度由 Black 处理
[tool.ruff.pydocstyle]
convention = "google"
# MyPy 类型检查配置
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = false
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
# Pytest 配置
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --cov=src --cov-report=term-missing --cov-report=html"
10. 安全规范
import secrets
import hashlib
from typing import Final
# ✅ 敏感信息处理
class SecurityConfig:
"""安全配置类。"""
# 使用 Final 标记常量
PASSWORD_MIN_LENGTH: Final[int] = 12
TOKEN_EXPIRY_HOURS: Final[int] = 24
@staticmethod
def hash_password(password: str) -> str:
"""安全地哈希密码(使用系统提供的算法)。"""
import bcrypt
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode(), salt).decode()
@staticmethod
def generate_secure_token(length: int = 32) -> str:
"""生成加密安全随机令牌。"""
return secrets.token_urlsafe(length)
# ✅ 输入验证与转义
from markupsafe import escape
def render_user_content(user_input: str) -> str:
"""安全渲染用户输入(防止 XSS)。"""
# 转义 HTML 特殊字符
safe_content = escape(user_input)
return f"<div class='user-content'>{safe_content}</div>"
# ✅ SQL 注入防护(使用参数化查询)
def get_user_by_email(email: str) -> User | None:
"""根据邮箱查询用户(安全方式)。"""
# ❌ 永远不要:f"SELECT * FROM users WHERE email = '{email}'"
# ✅ 正确方式:
query = "SELECT * FROM users WHERE email = :email"
result = db.execute(query, {"email": email})
return result.fetchone()
11. 代码审查清单
提交 PR 前请自检:
风格:通过 black 和 ruff 检查
类型:mypy 无错误
测试:新增代码有对应测试,且全部通过
文档:公共 API 有完整 docstring
性能:无明显的性能陷阱(N+1 查询、内存泄漏等)
安全:无硬编码密钥、SQL 注入风险、XSS 漏洞
兼容性:不破坏现有 API 契约
日志:关键操作有适当的日志记录
12. 演进原则
向后兼容:公共 API 变更需遵循 SemVer,废弃功能需标记 DeprecationWarning
渐进增强:新代码必须遵循本规范,旧代码重构时逐步更新
工具优先:优先使用自动化工具(Black、Ruff、MyPy)而非人工审查风格问题
实用主义:规范服务于代码质量,紧急情况下可技术债务标记(# TODO: TECH DEBT - 原因)
X 参考文献
