pycorrana.utils.large_data 源代码

"""
大数据优化模块
==============
提供针对大数据集的采样、分块计算等优化策略。
"""

import warnings
from typing import Optional, List, Tuple, Union, Callable
import numpy as np
import pandas as pd
from pathlib import Path


LARGE_DATA_THRESHOLD_ROWS = 100_000
LARGE_DATA_THRESHOLD_MEMORY_MB = 500
SAMPLE_SIZE_DEFAULT = 50_000
CHUNK_SIZE_DEFAULT = 10_000


def estimate_memory_usage(df: pd.DataFrame) -> float:
    """
    估算DataFrame的内存使用量(MB)。
    
    Parameters
    ----------
    df : pd.DataFrame
        输入数据
        
    Returns
    -------
    float
        内存使用量(MB)
    """
    return df.memory_usage(deep=True).sum() / (1024 * 1024)


def is_large_data(df: pd.DataFrame, 
                  threshold_rows: int = LARGE_DATA_THRESHOLD_ROWS,
                  threshold_memory_mb: float = LARGE_DATA_THRESHOLD_MEMORY_MB) -> bool:
    """
    判断数据是否为大数据集。
    
    Parameters
    ----------
    df : pd.DataFrame
        输入数据
    threshold_rows : int
        行数阈值
    threshold_memory_mb : float
        内存阈值(MB)
        
    Returns
    -------
    bool
        是否为大数据集
    """
    if len(df) > threshold_rows:
        return True
    
    if estimate_memory_usage(df) > threshold_memory_mb:
        return True
    
    return False


[文档] def smart_sample(df: pd.DataFrame, sample_size: int = SAMPLE_SIZE_DEFAULT, stratify_col: Optional[str] = None, random_state: int = 42, verbose: bool = True) -> pd.DataFrame: """ 智能采样:支持随机采样和分层采样。 Parameters ---------- df : pd.DataFrame 输入数据 sample_size : int 目标样本量 stratify_col : str, optional 用于分层采样的列名 random_state : int 随机种子 verbose : bool 是否输出信息 Returns ------- pd.DataFrame 采样后的数据 """ n = len(df) if n <= sample_size: if verbose: print(f"数据量 ({n}) 小于采样阈值 ({sample_size}),无需采样") return df if stratify_col and stratify_col in df.columns: try: sample_df = df.groupby(stratify_col, group_keys=False).apply( lambda x: x.sample( n=int(sample_size * len(x) / n), random_state=random_state ), include_groups=False ) if verbose: print(f"分层采样: {n} -> {len(sample_df)} 行 (分层列: {stratify_col})") return sample_df except Exception: pass sample_df = df.sample(n=sample_size, random_state=random_state) if verbose: print(f"随机采样: {n} -> {sample_size} 行") return sample_df
[文档] def chunked_correlation(df: pd.DataFrame, columns: Optional[List[str]] = None, method: str = 'pearson', chunk_size: int = CHUNK_SIZE_DEFAULT, verbose: bool = True) -> pd.DataFrame: """ 分块计算相关性矩阵(适用于大数据集)。 使用增量更新算法,避免一次性加载所有数据到内存。 Parameters ---------- df : pd.DataFrame 输入数据 columns : list, optional 要计算的列 method : str 相关方法:'pearson', 'spearman', 'kendall' chunk_size : int 每块大小 verbose : bool 是否输出进度 Returns ------- pd.DataFrame 相关性矩阵 """ if columns is None: columns = df.select_dtypes(include=[np.number]).columns.tolist() n_cols = len(columns) n_rows = len(df) if method == 'pearson': sum_x = np.zeros(n_cols) sum_x2 = np.zeros(n_cols) sum_xy = np.zeros((n_cols, n_cols)) count = np.zeros(n_cols) for i in range(0, n_rows, chunk_size): chunk = df.iloc[i:i+chunk_size][columns] chunk_values = chunk.values valid_mask = ~np.isnan(chunk_values) for j in range(n_cols): col_valid = valid_mask[:, j] sum_x[j] += np.nansum(chunk_values[:, j]) sum_x2[j] += np.nansum(chunk_values[:, j] ** 2) count[j] += col_valid.sum() for j in range(n_cols): for k in range(j, n_cols): valid_both = valid_mask[:, j] & valid_mask[:, k] if valid_both.sum() > 0: sum_xy[j, k] += np.nansum( chunk_values[:, j] * chunk_values[:, k] ) if verbose and (i // chunk_size) % 10 == 0: print(f" 处理进度: {min(i+chunk_size, n_rows)}/{n_rows} 行") corr_matrix = np.eye(n_cols) for j in range(n_cols): for k in range(j+1, n_cols): n_jk = min(count[j], count[k]) if n_jk > 1: mean_j = sum_x[j] / count[j] mean_k = sum_x[k] / count[k] cov_jk = (sum_xy[j, k] - n_jk * mean_j * mean_k) / (n_jk - 1) var_j = (sum_x2[j] - count[j] * mean_j**2) / (count[j] - 1) var_k = (sum_x2[k] - count[k] * mean_k**2) / (count[k] - 1) if var_j > 0 and var_k > 0: corr_matrix[j, k] = cov_jk / np.sqrt(var_j * var_k) corr_matrix[k, j] = corr_matrix[j, k] return pd.DataFrame(corr_matrix, index=columns, columns=columns) else: sample_size = min(SAMPLE_SIZE_DEFAULT, n_rows) if verbose: print(f"非Pearson方法使用采样计算 (样本量: {sample_size})") sample_df = df.sample(n=sample_size, random_state=42) if method == 'spearman': return sample_df[columns].corr(method='spearman') elif method == 'kendall': return sample_df[columns].corr(method='kendall') else: return sample_df[columns].corr()
[文档] def chunked_apply(df: pd.DataFrame, func: Callable, chunk_size: int = CHUNK_SIZE_DEFAULT, combine_func: Optional[Callable] = None, verbose: bool = True, **kwargs) -> dict: """ 分块应用函数并合并结果。 Parameters ---------- df : pd.DataFrame 输入数据 func : callable 每块应用的函数,返回字典 chunk_size : int 每块大小 combine_func : callable, optional 合并结果的函数 verbose : bool 是否输出进度 **kwargs 传递给func的参数 Returns ------- dict 合并后的结果 """ n_rows = len(df) results = [] for i in range(0, n_rows, chunk_size): chunk = df.iloc[i:i+chunk_size] result = func(chunk, **kwargs) results.append(result) if verbose: print(f" 处理进度: {min(i+chunk_size, n_rows)}/{n_rows} 行") if combine_func: return combine_func(results) return results
[文档] def optimize_dataframe(df: pd.DataFrame, verbose: bool = True) -> pd.DataFrame: """ 优化DataFrame内存使用。 Parameters ---------- df : pd.DataFrame 输入数据 verbose : bool 是否输出信息 Returns ------- pd.DataFrame 优化后的数据 """ start_memory = estimate_memory_usage(df) df_optimized = df.copy() for col in df_optimized.columns: col_type = df_optimized[col].dtype if col_type == 'int64': col_min = df_optimized[col].min() col_max = df_optimized[col].max() if col_min >= 0: if col_max < 255: df_optimized[col] = df_optimized[col].astype('uint8') elif col_max < 65535: df_optimized[col] = df_optimized[col].astype('uint16') elif col_max < 4294967295: df_optimized[col] = df_optimized[col].astype('uint32') else: if col_min > -128 and col_max < 127: df_optimized[col] = df_optimized[col].astype('int8') elif col_min > -32768 and col_max < 32767: df_optimized[col] = df_optimized[col].astype('int16') elif col_min > -2147483648 and col_max < 2147483647: df_optimized[col] = df_optimized[col].astype('int32') elif col_type == 'float64': df_optimized[col] = df_optimized[col].astype('float32') elif col_type == 'object': unique_ratio = df_optimized[col].nunique() / len(df_optimized) if unique_ratio < 0.5: df_optimized[col] = df_optimized[col].astype('category') end_memory = estimate_memory_usage(df_optimized) if verbose: reduction = (1 - end_memory / start_memory) * 100 print(f"内存优化: {start_memory:.1f}MB -> {end_memory:.1f}MB (减少 {reduction:.1f}%)") return df_optimized
[文档] class LargeDataConfig: """ 大数据处理配置类。 Examples -------- >>> config = LargeDataConfig( ... sample_size=100000, ... chunk_size=50000, ... auto_sample=True ... ) >>> analyzer = CorrAnalyzer(df, large_data_config=config) """
[文档] def __init__(self, sample_size: int = SAMPLE_SIZE_DEFAULT, chunk_size: int = CHUNK_SIZE_DEFAULT, auto_sample: bool = True, auto_optimize: bool = True, threshold_rows: int = LARGE_DATA_THRESHOLD_ROWS, threshold_memory_mb: float = LARGE_DATA_THRESHOLD_MEMORY_MB, stratify_column: Optional[str] = None, random_state: int = 42, verbose: bool = True): self.sample_size = sample_size self.chunk_size = chunk_size self.auto_sample = auto_sample self.auto_optimize = auto_optimize self.threshold_rows = threshold_rows self.threshold_memory_mb = threshold_memory_mb self.stratify_column = stratify_column self.random_state = random_state self.verbose = verbose
[文档] def should_optimize(self, df: pd.DataFrame) -> bool: """判断是否需要优化处理""" return is_large_data(df, self.threshold_rows, self.threshold_memory_mb)
[文档] def prepare_data(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, bool]: """ 准备数据:根据配置进行优化和采样。 Returns ------- tuple (处理后的数据, 是否进行了采样) """ sampled = False result_df = df.copy() if self.auto_optimize: result_df = optimize_dataframe(result_df, verbose=self.verbose) if self.auto_sample and self.should_optimize(result_df): result_df = smart_sample( result_df, sample_size=self.sample_size, stratify_col=self.stratify_column, random_state=self.random_state, verbose=self.verbose ) sampled = True return result_df, sampled
def __repr__(self): return (f"LargeDataConfig(sample_size={self.sample_size}, " f"chunk_size={self.chunk_size}, auto_sample={self.auto_sample})")