You've already forked FrameTour-RenderWorker
92 lines
2.4 KiB
Python
92 lines
2.4 KiB
Python
"""
|
|
JSON处理工具模块 - 提供安全的JSON解析和处理功能
|
|
"""
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Optional, Union
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def safe_json_loads(json_str: Union[str, bytes], default: Any = None) -> Any:
|
|
"""
|
|
安全解析JSON字符串
|
|
|
|
Args:
|
|
json_str: JSON字符串
|
|
default: 解析失败时返回的默认值
|
|
|
|
Returns:
|
|
解析后的对象,或默认值
|
|
"""
|
|
if not json_str or json_str == '{}':
|
|
return default or {}
|
|
|
|
try:
|
|
return json.loads(json_str)
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
logger.warning(f"Failed to parse JSON: {e}, input: {json_str}")
|
|
return default or {}
|
|
|
|
def safe_json_dumps(obj: Any, indent: Optional[int] = None, ensure_ascii: bool = False) -> str:
|
|
"""
|
|
安全序列化对象为JSON字符串
|
|
|
|
Args:
|
|
obj: 要序列化的对象
|
|
indent: 缩进空格数
|
|
ensure_ascii: 是否确保ASCII编码
|
|
|
|
Returns:
|
|
JSON字符串
|
|
"""
|
|
try:
|
|
return json.dumps(obj, indent=indent, ensure_ascii=ensure_ascii)
|
|
except (TypeError, ValueError) as e:
|
|
logger.error(f"Failed to serialize to JSON: {e}")
|
|
return "{}"
|
|
|
|
def get_nested_value(data: Dict[str, Any], key_path: str, default: Any = None) -> Any:
|
|
"""
|
|
从嵌套字典中安全获取值
|
|
|
|
Args:
|
|
data: 字典数据
|
|
key_path: 键路径,用点分隔(如 "user.profile.name")
|
|
default: 默认值
|
|
|
|
Returns:
|
|
找到的值或默认值
|
|
"""
|
|
if not isinstance(data, dict):
|
|
return default
|
|
|
|
try:
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys:
|
|
if isinstance(current, dict) and key in current:
|
|
current = current[key]
|
|
else:
|
|
return default
|
|
|
|
return current
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get nested value for path '{key_path}': {e}")
|
|
return default
|
|
|
|
def merge_dicts(*dicts: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
合并多个字典,后面的字典会覆盖前面的字典中相同的键
|
|
|
|
Args:
|
|
*dicts: 要合并的字典
|
|
|
|
Returns:
|
|
合并后的字典
|
|
"""
|
|
result = {}
|
|
for d in dicts:
|
|
if isinstance(d, dict):
|
|
result.update(d)
|
|
return result |