Skip to content

小结和复习

掌握 Python 数据结构 —— 列表、字典、元组、集合的完整回顾


本章知识点总结

1. 四种核心数据结构对比

特性列表 (List)元组 (Tuple)字典 (Dict)集合 (Set)
符号[...](...){k:v, ...}{...}
有序(3.7+保持插入顺序)
可变
重复键唯一,值可重复
索引方式整数 list[0]整数 tuple[0]dict['key']无索引
典型用途有序集合不可变记录键值映射唯一值、集合运算

2. 列表 (List)

核心特点:有序、可变、允许重复

创建方式

python
# 方法 1: 直接创建
ages = [25, 30, 35, 40]

# 方法 2: range() 转换
numbers = list(range(10))  # [0, 1, 2, ..., 9]

# 方法 3: 列表推导式
squares = [x**2 for x in range(5)]  # [0, 1, 4, 9, 16]

常用操作

python
# 添加元素
ages.append(45)           # 末尾添加
ages.insert(0, 20)        # 指定位置插入
ages.extend([50, 55])     # 批量添加

# 删除元素
ages.remove(30)           # 删除第一个 30
ages.pop()                # 删除并返回最后一个
ages.pop(0)               # 删除并返回索引 0
del ages[1]               # 删除索引 1
ages.clear()              # 清空列表

# 查找
index = ages.index(35)    # 返回 35 的索引
count = ages.count(30)    # 统计 30 出现次数
exists = 30 in ages       # 检查是否存在

# 排序
ages.sort()               # 原地排序(升序)
ages.sort(reverse=True)   # 降序
sorted_ages = sorted(ages)  # 返回新列表
ages.reverse()            # 反转列表

# 切片
first_three = ages[:3]    # 前 3 个
last_two = ages[-2:]      # 后 2 个
every_second = ages[::2]  # 每隔一个

社科应用

python
# 存储样本 ID
sample_ids = [1001, 1002, 1003, 1004]

# 存储多个年份的数据
years = list(range(2010, 2021))  # [2010, 2011, ..., 2020]

# 筛选有效样本
valid_ages = [age for age in ages if 18 <= age <= 100]

3. 元组 (Tuple)

核心特点:有序、不可变、允许重复

创建方式

python
# 标准创建
coordinates = (10, 20)

# 单元素元组(注意逗号)
single = (42,)  # 有逗号
not_tuple = (42)  # 这只是整数,不是元组

# 元组解包
x, y = coordinates  # x=10, y=20

何时使用元组

  • 数据不应被修改(配置参数、常量)
  • 作为字典的键(列表不行)
  • 函数返回多个值
  • 性能要求高(比列表快)

实际应用

python
# 1. 固定配置
REGRESSION_CONFIG = ("OLS", 0.05, 1000)  # 模型类型、显著性、样本量

# 2. 函数返回多个值
def calculate_stats(data):
    return (mean(data), std(data), len(data))

mean_val, std_val, n = calculate_stats(incomes)

# 3. 作为字典键
results = {
    ("Model1", "OLS"): 0.85,
    ("Model2", "Logit"): 0.78
}

# 4. 数据记录(不可变)
student = (1001, "Alice", 25, "Economics")  # ID, 姓名, 年龄, 专业

4. 字典 (Dictionary)

核心特点:键值对、无序(3.7+保持插入顺序)、键唯一

创建方式

python
# 方法 1: 直接创建
student = {"name": "Alice", "age": 25, "major": "Economics"}

# 方法 2: dict() 函数
student = dict(name="Alice", age=25, major="Economics")

# 方法 3: 字典推导式
squares = {x: x**2 for x in range(5)}  # {0:0, 1:1, 2:4, 3:9, 4:16}

# 方法 4: 从列表对创建
pairs = [("name", "Alice"), ("age", 25)]
student = dict(pairs)

常用操作

python
# 访问
name = student["name"]              # 直接访问(键不存在会报错)
name = student.get("name")          # 安全访问
name = student.get("nickname", "Unknown")  # 提供默认值

# 修改和添加
student["age"] = 26                 # 修改
student["gpa"] = 3.8               # 添加新键

# 删除
del student["age"]                  # 删除键值对
age = student.pop("age", None)      # 删除并返回,提供默认值

# 遍历
for key in student:                 # 遍历键
    print(key, student[key])

for key, value in student.items():  # 遍历键值对
    print(key, value)

for value in student.values():      # 遍历值
    print(value)

# 检查键是否存在
if "age" in student:
    print(student["age"])

# 合并字典
student.update({"gpa": 3.8, "year": 3})

社科应用

python
# 1. 存储个体数据
respondent = {
    "id": 1001,
    "age": 30,
    "income": 75000,
    "gender": "Female",
    "education": 16
}

# 2. 变量标签映射
var_labels = {
    "age": "年龄",
    "income": "年收入(元)",
    "edu": "受教育年限"
}

# 3. 回归结果
regression_results = {
    "coef": 5000.5,
    "std_err": 250.3,
    "t_value": 19.98,
    "p_value": 0.000,
    "r_squared": 0.65
}

# 4. 分组统计
income_by_gender = {
    "Male": 75000,
    "Female": 70000,
    "Other": 72500
}

5. 集合 (Set)

核心特点:无序、唯一、可变

创建方式

python
# 方法 1: 直接创建
unique_ids = {1001, 1002, 1003}

# 方法 2: set() 函数(从列表去重)
ids = [1001, 1002, 1003, 1001, 1002]
unique_ids = set(ids)  # {1001, 1002, 1003}

# 注意:空集合必须用 set()
empty_set = set()      # 空集合
empty_dict = {}        # 空字典(不是空集合!)

集合运算

python
group_a = {1, 2, 3, 4, 5}
group_b = {4, 5, 6, 7, 8}

# 并集(所有元素)
union = group_a | group_b          # {1, 2, 3, 4, 5, 6, 7, 8}
union = group_a.union(group_b)

# 交集(共同元素)
intersection = group_a & group_b   # {4, 5}
intersection = group_a.intersection(group_b)

# 差集(A 有但 B 没有)
difference = group_a - group_b     # {1, 2, 3}
difference = group_a.difference(group_b)

# 对称差(只在一个集合中)
sym_diff = group_a ^ group_b       # {1, 2, 3, 6, 7, 8}

常用操作

python
# 添加元素
unique_ids.add(1004)
unique_ids.update([1005, 1006])

# 删除元素
unique_ids.remove(1001)    # 不存在会报错
unique_ids.discard(1001)   # 不存在不报错

# 成员检测(非常快!)
if 1001 in unique_ids:
    print("存在")

社科应用

python
# 1. 数据去重
all_respondent_ids = [1001, 1002, 1003, 1001, 1004, 1002]
unique_ids = set(all_respondent_ids)

# 2. 样本匹配(找交集)
treatment_group = {1001, 1002, 1003, 1004}
control_group = {1003, 1004, 1005, 1006}
matched_sample = treatment_group & control_group  # {1003, 1004}

# 3. 找出只在处理组的样本
treatment_only = treatment_group - control_group  # {1001, 1002}

# 4. 快速检查 ID 是否存在(比列表快)
valid_ids = set(range(1000, 2000))
if respondent_id in valid_ids:
    print("有效 ID")

选择指南速查表

需求推荐数据结构原因
存储有序的成绩List需要保持顺序
函数返回多个统计量Tuple不可变、轻量
存储学生 ID → 信息Dict快速查找
删除重复的 IDSet自动去重
需要频繁修改的序列List可变
配置参数(不应修改)Tuple不可变
变量名映射Dict键值对应
找两组样本的交集Set集合运算

Python vs Stata vs R 对比

列表操作

操作PythonStataR
创建序列list(range(10))gen id = _n1:10
添加元素list.append(x)replacec(list, x)
删除元素list.remove(x)drop iflist[-index]
切片list[1:3]in 1/3list[1:3]

字典/映射

操作PythonStataR
创建映射{"a": 1, "b": 2}Label valueslist(a=1, b=2)
访问值dict["a"]N/Alist$a
遍历for k, v in dict.items()N/Alapply(list, ...)

集合操作

操作PythonStataR
去重set(list)duplicates dropunique(vector)
交集set_a & set_bmerge + keep if _merge==3intersect(a, b)
并集`set_aset_b`append
差集set_a - set_bmerge + keep if _merge==1setdiff(a, b)

️ 易错点和最佳实践

易错点 1: 列表索引从 0 开始

python
#  常见错误(以为从 1 开始)
ages = [25, 30, 35, 40]
first = ages[1]  # 这是第 2 个元素!实际是 30

#  正确
first = ages[0]   # 25(第 1 个)
last = ages[-1]   # 40(最后一个)

易错点 2: 修改列表的副作用

python
#  错误(浅拷贝陷阱)
original = [1, 2, 3]
copy = original      # 这不是拷贝,是引用!
copy.append(4)
print(original)      # [1, 2, 3, 4](原列表也被修改了!)

#  正确(深拷贝)
copy = original.copy()  # 方法 1
copy = original[:]      # 方法 2
copy = list(original)   # 方法 3

import copy
deep_copy = copy.deepcopy(nested_list)  # 嵌套列表用这个

易错点 3: 单元素元组的逗号

python
#  错误(不是元组)
not_tuple = (42)
print(type(not_tuple))  # <class 'int'>

#  正确(必须有逗号)
is_tuple = (42,)
print(type(is_tuple))   # <class 'tuple'>

易错点 4: 字典键不存在

python
student = {"name": "Alice", "age": 25}

#  错误(键不存在会报错)
gpa = student["gpa"]  # KeyError: 'gpa'

#  正确(安全访问)
gpa = student.get("gpa", 0.0)  # 不存在返回 0.0

易错点 5: 集合是无序的

python
#  错误(期望保持顺序)
ids = {1003, 1001, 1002}
print(ids)  # {1001, 1002, 1003}(可能是任意顺序!)

#  正确(需要顺序用列表)
ids = [1003, 1001, 1002]  # 保持插入顺序
unique_ids = []
seen = set()
for id in ids:
    if id not in seen:
        unique_ids.append(id)
        seen.add(id)

最佳实践 1: 使用推导式

python
#  不够优雅
squares = []
for x in range(10):
    squares.append(x ** 2)

#  更优雅(列表推导式)
squares = [x ** 2 for x in range(10)]

#  字典推导式
id_to_age = {id: age for id, age in zip(ids, ages)}

#  集合推导式
unique_squares = {x ** 2 for x in range(-5, 6)}

最佳实践 2: 善用 get() 和 setdefault()

python
# 统计词频
word_count = {}

#  不够优雅
for word in words:
    if word in word_count:
        word_count[word] += 1
    else:
        word_count[word] = 1

#  更优雅
for word in words:
    word_count[word] = word_count.get(word, 0) + 1

#  或使用 defaultdict
from collections import defaultdict
word_count = defaultdict(int)
for word in words:
    word_count[word] += 1

最佳实践 3: 合理选择数据结构

python
# 场景:需要快速检查 ID 是否存在

#  使用列表(慢,O(n))
valid_ids = [1001, 1002, 1003, ..., 2000]  # 1000 个 ID
if respondent_id in valid_ids:  # 需要遍历整个列表
    pass

#  使用集合(快,O(1))
valid_ids = set(range(1001, 2001))  # 集合
if respondent_id in valid_ids:  # 立即查找
    pass

综合练习题

基础巩固题(1-3题)

练习 1: 问卷数据去重与排序

题目描述: 处理问卷调查中的重复样本,并按 ID 排序。

要求

  1. 删除重复的样本 ID
  2. 按 ID 升序排序
  3. 返回去重后的 ID 列表和删除的重复数量

输入输出示例

python
sample_ids = [1003, 1001, 1004, 1001, 1002, 1003, 1005, 1002]
unique_ids, duplicate_count = remove_duplicates(sample_ids)

print(unique_ids)          # [1001, 1002, 1003, 1004, 1005]
print(duplicate_count)     # 3
提示
  1. 使用 set() 去重
  2. 使用 sorted() 排序
  3. 计算原始数量 - 去重后数量
参考答案
python
def remove_duplicates(sample_ids):
    """
    去重并排序样本 ID
    
    Parameters:
        sample_ids (list): 样本 ID 列表(可能有重复)
    
    Returns:
        tuple: (去重后的有序列表, 重复数量)
    """
    # 方法 1: 使用集合去重
    unique_ids = sorted(set(sample_ids))
    duplicate_count = len(sample_ids) - len(unique_ids)
    
    return unique_ids, duplicate_count

# 方法 2: 保持原始顺序去重
def remove_duplicates_keep_order(sample_ids):
    """保持第一次出现的顺序"""
    seen = set()
    unique_ids = []
    
    for id in sample_ids:
        if id not in seen:
            unique_ids.append(id)
            seen.add(id)
    
    duplicate_count = len(sample_ids) - len(unique_ids)
    return unique_ids, duplicate_count

# 测试
sample_ids = [1003, 1001, 1004, 1001, 1002, 1003, 1005, 1002]

# 方法 1: 排序后去重
unique_ids, dup_count = remove_duplicates(sample_ids)
print(f"去重后(排序): {unique_ids}")
print(f"删除了 {dup_count} 个重复样本")

# 方法 2: 保持顺序去重
unique_ids2, dup_count2 = remove_duplicates_keep_order(sample_ids)
print(f"去重后(保持顺序): {unique_ids2}")

练习 2: 词频统计

题目描述: 统计文本中每个单词出现的次数。

要求

  1. 不区分大小写
  2. 返回字典,键为单词,值为出现次数
  3. 按频率降序排列(返回列表)

输入输出示例

python
text = "Python is great. Python is powerful. Python is popular."
word_freq = count_words(text)

print(word_freq)
# {'python': 3, 'is': 3, 'great': 1, 'powerful': 1, 'popular': 1}

sorted_words = sort_by_frequency(word_freq)
print(sorted_words)
# [('python', 3), ('is', 3), ('great', 1), ('powerful', 1), ('popular', 1)]
参考答案
python
def count_words(text):
    """
    统计单词频率
    
    Parameters:
        text (str): 文本
    
    Returns:
        dict: 单词频率字典
    """
    # 移除标点,转小写,分词
    import string
    
    # 移除标点符号
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    # 转小写并分词
    words = text.lower().split()
    
    # 统计词频
    word_freq = {}
    for word in words:
        word_freq[word] = word_freq.get(word, 0) + 1
    
    return word_freq

def sort_by_frequency(word_freq):
    """
    按频率降序排序
    
    Parameters:
        word_freq (dict): 单词频率字典
    
    Returns:
        list: [(单词, 频率), ...] 按频率降序
    """
    # sorted() 返回列表,key 指定排序依据
    return sorted(word_freq.items(), key=lambda x: x[1], reverse=True)

# 使用 Counter(更简单)
from collections import Counter

def count_words_v2(text):
    """使用 Counter 实现"""
    import string
    text = text.translate(str.maketrans('', '', string.punctuation))
    words = text.lower().split()
    return dict(Counter(words))

def top_n_words(text, n=10):
    """返回出现最多的 n 个单词"""
    from collections import Counter
    import string
    
    text = text.translate(str.maketrans('', '', string.punctuation))
    words = text.lower().split()
    counter = Counter(words)
    
    return counter.most_common(n)

# 测试
text = "Python is great. Python is powerful. Python is popular. Python Python."
word_freq = count_words(text)
print("词频统计:", word_freq)

sorted_words = sort_by_frequency(word_freq)
print("\n按频率排序:")
for word, freq in sorted_words:
    print(f"  {word}: {freq}")

# 使用 Counter
print("\nTop 3 单词:")
print(top_n_words(text, n=3))

练习 3: 学生成绩管理

题目描述: 创建一个学生成绩管理系统,使用字典存储学生信息。

要求

  1. 添加学生成绩
  2. 查询学生成绩
  3. 计算班级平均分
  4. 找出最高分和最低分的学生

输入输出示例

python
grade_book = GradeBook()
grade_book.add_student("Alice", 95)
grade_book.add_student("Bob", 85)
grade_book.add_student("Carol", 90)

print(grade_book.get_grade("Alice"))  # 95
print(grade_book.class_average())     # 90.0
print(grade_book.top_student())       # ("Alice", 95)
print(grade_book.bottom_student())    # ("Bob", 85)
参考答案
python
class GradeBook:
    """学生成绩管理系统"""
    
    def __init__(self):
        """初始化空成绩簿"""
        self.grades = {}  # {学生姓名: 成绩}
    
    def add_student(self, name, grade):
        """
        添加学生成绩
        
        Parameters:
            name (str): 学生姓名
            grade (float): 成绩
        """
        self.grades[name] = grade
    
    def get_grade(self, name):
        """
        查询学生成绩
        
        Parameters:
            name (str): 学生姓名
        
        Returns:
            float or None: 成绩(不存在返回 None)
        """
        return self.grades.get(name)
    
    def class_average(self):
        """
        计算班级平均分
        
        Returns:
            float: 平均分(无学生返回 0)
        """
        if not self.grades:
            return 0.0
        
        return sum(self.grades.values()) / len(self.grades)
    
    def top_student(self):
        """
        找出最高分学生
        
        Returns:
            tuple: (姓名, 成绩)
        """
        if not self.grades:
            return None
        
        top_name = max(self.grades, key=self.grades.get)
        return (top_name, self.grades[top_name])
    
    def bottom_student(self):
        """找出最低分学生"""
        if not self.grades:
            return None
        
        bottom_name = min(self.grades, key=self.grades.get)
        return (bottom_name, self.grades[bottom_name])
    
    def students_above_average(self):
        """返回高于平均分的学生"""
        avg = self.class_average()
        return {name: grade for name, grade in self.grades.items() 
                if grade > avg}
    
    def grade_distribution(self):
        """
        成绩分布统计
        
        Returns:
            dict: {'A': count, 'B': count, ...}
        """
        distribution = {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'F': 0}
        
        for grade in self.grades.values():
            if grade >= 90:
                distribution['A'] += 1
            elif grade >= 80:
                distribution['B'] += 1
            elif grade >= 70:
                distribution['C'] += 1
            elif grade >= 60:
                distribution['D'] += 1
            else:
                distribution['F'] += 1
        
        return distribution
    
    def __str__(self):
        """字符串表示"""
        return f"GradeBook with {len(self.grades)} students"

# 测试
grade_book = GradeBook()
grade_book.add_student("Alice", 95)
grade_book.add_student("Bob", 85)
grade_book.add_student("Carol", 90)
grade_book.add_student("David", 75)
grade_book.add_student("Emma", 92)

print(f"Alice 的成绩: {grade_book.get_grade('Alice')}")
print(f"班级平均分: {grade_book.class_average():.1f}")
print(f"最高分: {grade_book.top_student()}")
print(f"最低分: {grade_book.bottom_student()}")
print(f"\n高于平均分的学生:")
for name, grade in grade_book.students_above_average().items():
    print(f"  {name}: {grade}")

print(f"\n成绩分布: {grade_book.grade_distribution()}")

综合应用题(4-7题)

练习 4: 问卷数据合并

题目描述: 合并两份问卷数据(基础信息和补充信息),根据受访者 ID 进行匹配。

要求

  1. 按 ID 合并两个字典列表
  2. 处理只在一份问卷中的样本
  3. 返回合并后的完整数据

输入输出示例

python
basic_info = [
    {"id": 1001, "name": "Alice", "age": 25},
    {"id": 1002, "name": "Bob", "age": 30},
    {"id": 1003, "name": "Carol", "age": 35}
]

supplement = [
    {"id": 1001, "income": 50000, "education": 16},
    {"id": 1002, "income": 75000, "education": 18},
    {"id": 1004, "income": 60000, "education": 14}  # 1004 不在基础信息中
]

merged = merge_surveys(basic_info, supplement, on="id")
# [
#     {"id": 1001, "name": "Alice", "age": 25, "income": 50000, "education": 16},
#     {"id": 1002, "name": "Bob", "age": 30, "income": 75000, "education": 18},
#     {"id": 1003, "name": "Carol", "age": 35, "income": None, "education": None},
#     {"id": 1004, "name": None, "age": None, "income": 60000, "education": 14}
# ]
参考答案
python
def merge_surveys(basic_info, supplement, on="id", how="outer"):
    """
    合并两份问卷数据
    
    Parameters:
        basic_info (list): 基础信息列表
        supplement (list): 补充信息列表
        on (str): 用于匹配的键
        how (str): 合并方式 ("inner", "left", "right", "outer")
    
    Returns:
        list: 合并后的数据
    """
    # 转换为字典以便快速查找
    basic_dict = {record[on]: record for record in basic_info}
    supp_dict = {record[on]: record for record in supplement}
    
    # 根据合并方式确定要保留的 ID
    if how == "inner":
        ids = set(basic_dict.keys()) & set(supp_dict.keys())
    elif how == "left":
        ids = set(basic_dict.keys())
    elif how == "right":
        ids = set(supp_dict.keys())
    else:  # outer
        ids = set(basic_dict.keys()) | set(supp_dict.keys())
    
    # 合并
    merged = []
    for id in sorted(ids):
        record = {on: id}
        
        # 从基础信息获取字段
        if id in basic_dict:
            record.update(basic_dict[id])
        else:
            # 添加 None 占位
            basic_keys = set()
            for r in basic_info:
                basic_keys.update(r.keys())
            for key in basic_keys:
                if key != on and key not in record:
                    record[key] = None
        
        # 从补充信息获取字段
        if id in supp_dict:
            supp_data = supp_dict[id].copy()
            supp_data.pop(on, None)  # 移除重复的 ID 字段
            record.update(supp_data)
        else:
            # 添加 None 占位
            supp_keys = set()
            for r in supplement:
                supp_keys.update(r.keys())
            for key in supp_keys:
                if key != on and key not in record:
                    record[key] = None
        
        merged.append(record)
    
    return merged

def merge_statistics(basic_info, supplement, on="id"):
    """
    合并统计信息
    
    Returns:
        dict: 合并统计
    """
    basic_ids = {record[on] for record in basic_info}
    supp_ids = {record[on] for record in supplement}
    
    return {
        "basic_only": len(basic_ids - supp_ids),
        "supplement_only": len(supp_ids - basic_ids),
        "both": len(basic_ids & supp_ids),
        "total": len(basic_ids | supp_ids)
    }

# 测试
basic_info = [
    {"id": 1001, "name": "Alice", "age": 25},
    {"id": 1002, "name": "Bob", "age": 30},
    {"id": 1003, "name": "Carol", "age": 35}
]

supplement = [
    {"id": 1001, "income": 50000, "education": 16},
    {"id": 1002, "income": 75000, "education": 18},
    {"id": 1004, "income": 60000, "education": 14}
]

# 外连接(保留所有)
merged = merge_surveys(basic_info, supplement, on="id", how="outer")
print("外连接(保留所有):")
for record in merged:
    print(f"  {record}")

# 内连接(只保留匹配的)
merged_inner = merge_surveys(basic_info, supplement, on="id", how="inner")
print(f"\n内连接(只保留匹配的): {len(merged_inner)} 条记录")

# 统计信息
stats = merge_statistics(basic_info, supplement, on="id")
print(f"\n合并统计:")
print(f"  只在基础信息: {stats['basic_only']}")
print(f"  只在补充信息: {stats['supplement_only']}")
print(f"  两者都有: {stats['both']}")
print(f"  总计: {stats['total']}")

练习 5: 分组聚合统计

题目描述: 对问卷数据按性别分组,计算各组的统计量。

要求

  1. 按性别分组
  2. 计算各组的人数、平均收入、平均年龄
  3. 返回字典格式的结果

输入输出示例

python
data = [
    {"id": 1001, "gender": "Male", "age": 25, "income": 50000},
    {"id": 1002, "gender": "Female", "age": 30, "income": 60000},
    {"id": 1003, "gender": "Male", "age": 35, "income": 75000},
    {"id": 1004, "gender": "Female", "age": 28, "income": 55000},
]

result = group_statistics(data, by="gender")
# {
#     'Male': {'count': 2, 'avg_age': 30.0, 'avg_income': 62500.0},
#     'Female': {'count': 2, 'avg_age': 29.0, 'avg_income': 57500.0}
# }
参考答案
python
def group_statistics(data, by="gender", agg_fields=None):
    """
    分组聚合统计
    
    Parameters:
        data (list): 数据列表
        by (str): 分组字段
        agg_fields (list): 要聚合的数值字段(None 表示所有数值字段)
    
    Returns:
        dict: 分组统计结果
    """
    # 自动检测数值字段
    if agg_fields is None:
        agg_fields = []
        if data:
            for key, value in data[0].items():
                if isinstance(value, (int, float)) and key != by:
                    agg_fields.append(key)
    
    # 分组收集数据
    groups = {}
    for record in data:
        group_key = record[by]
        if group_key not in groups:
            groups[group_key] = []
        groups[group_key].append(record)
    
    # 计算统计量
    result = {}
    for group_key, group_records in groups.items():
        stats = {'count': len(group_records)}
        
        for field in agg_fields:
            values = [r[field] for r in group_records if field in r]
            if values:
                stats[f'avg_{field}'] = sum(values) / len(values)
                stats[f'min_{field}'] = min(values)
                stats[f'max_{field}'] = max(values)
        
        result[group_key] = stats
    
    return result

def pivot_table(data, index, columns, values, aggfunc='mean'):
    """
    创建透视表
    
    Parameters:
        data (list): 数据
        index (str): 行分组字段
        columns (str): 列分组字段
        values (str): 值字段
        aggfunc (str): 聚合函数 ('mean', 'sum', 'count')
    
    Returns:
        dict: 嵌套字典 {index_value: {column_value: aggregated_value}}
    """
    from collections import defaultdict
    
    # 收集数据
    groups = defaultdict(lambda: defaultdict(list))
    for record in data:
        index_val = record[index]
        column_val = record[columns]
        value = record[values]
        groups[index_val][column_val].append(value)
    
    # 聚合
    result = {}
    for index_val, column_dict in groups.items():
        result[index_val] = {}
        for column_val, values_list in column_dict.items():
            if aggfunc == 'mean':
                result[index_val][column_val] = sum(values_list) / len(values_list)
            elif aggfunc == 'sum':
                result[index_val][column_val] = sum(values_list)
            elif aggfunc == 'count':
                result[index_val][column_val] = len(values_list)
    
    return result

# 测试
data = [
    {"id": 1001, "gender": "Male", "age": 25, "income": 50000, "education": 16},
    {"id": 1002, "gender": "Female", "age": 30, "income": 60000, "education": 18},
    {"id": 1003, "gender": "Male", "age": 35, "income": 75000, "education": 16},
    {"id": 1004, "gender": "Female", "age": 28, "income": 55000, "education": 14},
    {"id": 1005, "gender": "Male", "age": 40, "income": 80000, "education": 20},
]

# 分组统计
result = group_statistics(data, by="gender")
print("按性别分组统计:")
for gender, stats in result.items():
    print(f"\n{gender}:")
    for key, value in stats.items():
        if isinstance(value, float):
            print(f"  {key}: {value:.2f}")
        else:
            print(f"  {key}: {value}")

# 透视表:性别 × 教育水平 → 平均收入
print("\n透视表(性别 × 教育水平 → 平均收入):")
pivot = pivot_table(data, index="gender", columns="education", 
                    values="income", aggfunc="mean")
for gender, edu_dict in pivot.items():
    print(f"{gender}: {edu_dict}")

练习 6: 数据转换 - 宽格式转长格式

题目描述: 将宽格式的数据转换为长格式(类似 Stata 的 reshape long)。

宽格式

{"id": 1001, "income_2020": 50000, "income_2021": 55000, "income_2022": 60000}

长格式

[
    {"id": 1001, "year": 2020, "income": 50000},
    {"id": 1001, "year": 2021, "income": 55000},
    {"id": 1001, "year": 2022, "income": 60000}
]

要求

  1. 识别需要转换的列(income_YYYY)
  2. 提取年份
  3. 生成长格式数据
参考答案
python
def wide_to_long(data, id_var, stub, sep="_"):
    """
    宽格式转长格式
    
    Parameters:
        data (list): 宽格式数据
        id_var (str): ID 字段
        stub (str): 变量前缀(如 "income")
        sep (str): 分隔符
    
    Returns:
        list: 长格式数据
    """
    long_data = []
    
    for record in data:
        # 提取固定字段
        id_value = record[id_var]
        
        # 查找所有匹配的列
        for key, value in record.items():
            if key.startswith(stub + sep):
                # 提取后缀(如年份)
                suffix = key.split(sep)[1]
                
                # 创建长格式记录
                long_record = {
                    id_var: id_value,
                    'year': int(suffix),
                    stub: value
                }
                long_data.append(long_record)
    
    return sorted(long_data, key=lambda x: (x[id_var], x['year']))

def long_to_wide(data, id_var, time_var, value_var):
    """
    长格式转宽格式
    
    Parameters:
        data (list): 长格式数据
        id_var (str): ID 字段
        time_var (str): 时间字段(如 year)
        value_var (str): 值字段(如 income)
    
    Returns:
        list: 宽格式数据
    """
    from collections import defaultdict
    
    # 收集数据
    wide_dict = defaultdict(dict)
    for record in data:
        id_val = record[id_var]
        time_val = record[time_var]
        value = record[value_var]
        
        wide_dict[id_val][f"{value_var}_{time_val}"] = value
    
    # 转换为列表
    wide_data = []
    for id_val, time_dict in wide_dict.items():
        wide_record = {id_var: id_val}
        wide_record.update(time_dict)
        wide_data.append(wide_record)
    
    return sorted(wide_data, key=lambda x: x[id_var])

# 测试
# 宽格式数据
wide_data = [
    {"id": 1001, "name": "Alice", "income_2020": 50000, "income_2021": 55000, "income_2022": 60000},
    {"id": 1002, "name": "Bob", "income_2020": 60000, "income_2021": 65000, "income_2022": 70000}
]

# 转为长格式
long_data = wide_to_long(wide_data, id_var="id", stub="income", sep="_")
print("长格式数据:")
for record in long_data:
    print(f"  {record}")

# 转回宽格式
wide_again = long_to_wide(long_data, id_var="id", time_var="year", value_var="income")
print("\n转回宽格式:")
for record in wide_again:
    print(f"  {record}")

练习 7: 嵌套字典数据提取

题目描述: 处理嵌套的 JSON 数据,提取特定字段。

输入数据(模拟 API 返回):

python
api_response = {
    "data": {
        "survey": {
            "id": "2024_income_survey",
            "respondents": [
                {
                    "personal": {"id": 1001, "name": "Alice", "age": 25},
                    "economic": {"income": 50000, "employed": True}
                },
                {
                    "personal": {"id": 1002, "name": "Bob", "age": 30},
                    "economic": {"income": 75000, "employed": True}
                }
            ]
        }
    }
}

要求: 提取为扁平化的列表:

python
[
    {"id": 1001, "name": "Alice", "age": 25, "income": 50000, "employed": True},
    {"id": 1002, "name": "Bob", "age": 30, "income": 75000, "employed": True}
]
参考答案
python
def flatten_nested_data(api_response):
    """
    扁平化嵌套数据
    
    Parameters:
        api_response (dict): 嵌套的 API 响应
    
    Returns:
        list: 扁平化的数据列表
    """
    respondents = api_response["data"]["survey"]["respondents"]
    
    flat_data = []
    for resp in respondents:
        flat_record = {}
        
        # 合并 personal 和 economic 字段
        if "personal" in resp:
            flat_record.update(resp["personal"])
        if "economic" in resp:
            flat_record.update(resp["economic"])
        
        flat_data.append(flat_record)
    
    return flat_data

def extract_nested(data, path):
    """
    根据路径提取嵌套数据
    
    Parameters:
        data (dict): 嵌套字典
        path (str): 路径,用点分隔(如 "data.survey.respondents")
    
    Returns:
        提取的数据
    """
    keys = path.split('.')
    result = data
    
    for key in keys:
        if isinstance(result, dict):
            result = result.get(key)
        else:
            return None
        
        if result is None:
            return None
    
    return result

def flatten_all_levels(nested_dict, parent_key='', sep='_'):
    """
    递归扁平化所有层级
    
    Parameters:
        nested_dict (dict): 嵌套字典
        parent_key (str): 父键名
        sep (str): 分隔符
    
    Returns:
        dict: 扁平化的字典
    """
    items = []
    
    for k, v in nested_dict.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        
        if isinstance(v, dict):
            items.extend(flatten_all_levels(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    
    return dict(items)

# 测试
api_response = {
    "data": {
        "survey": {
            "id": "2024_income_survey",
            "respondents": [
                {
                    "personal": {"id": 1001, "name": "Alice", "age": 25},
                    "economic": {"income": 50000, "employed": True}
                },
                {
                    "personal": {"id": 1002, "name": "Bob", "age": 30},
                    "economic": {"income": 75000, "employed": True}
                }
            ]
        }
    }
}

# 扁平化
flat_data = flatten_nested_data(api_response)
print("扁平化数据:")
for record in flat_data:
    print(f"  {record}")

# 路径提取
respondents = extract_nested(api_response, "data.survey.respondents")
print(f"\n提取的受访者数量: {len(respondents)}")

# 完全扁平化
nested = {"a": {"b": {"c": 1}}, "d": 2}
flat = flatten_all_levels(nested)
print(f"\n完全扁平化: {flat}")

挑战题(8-10题)

练习 8: 社交网络分析 - 好友关系

题目描述: 分析社交网络中的好友关系,找出共同好友、推荐好友等。

数据结构

python
friendships = {
    "Alice": {"Bob", "Carol", "David"},
    "Bob": {"Alice", "David", "Emma"},
    "Carol": {"Alice", "David"},
    "David": {"Alice", "Bob", "Carol", "Emma"},
    "Emma": {"Bob", "David"}
}

要求

  1. 找出两人的共同好友
  2. 推荐好友(好友的好友,但不是自己的好友)
  3. 统计每个人的好友数量排名
参考答案
python
def common_friends(friendships, person1, person2):
    """
    找出两人的共同好友
    
    Parameters:
        friendships (dict): 好友关系字典 {人名: 好友集合}
        person1, person2 (str): 两个人的名字
    
    Returns:
        set: 共同好友集合
    """
    if person1 not in friendships or person2 not in friendships:
        return set()
    
    return friendships[person1] & friendships[person2]

def friend_recommendations(friendships, person, max_recommendations=5):
    """
    推荐好友(好友的好友,但不是自己的好友)
    
    Parameters:
        friendships (dict): 好友关系字典
        person (str): 要推荐的人
        max_recommendations (int): 最多推荐数量
    
    Returns:
        list: [(推荐人名, 共同好友数), ...] 按共同好友数降序
    """
    if person not in friendships:
        return []
    
    my_friends = friendships[person]
    recommendations = {}  # {候选人: 共同好友数}
    
    # 遍历我的好友
    for friend in my_friends:
        if friend not in friendships:
            continue
        
        # 遍历好友的好友
        for friend_of_friend in friendships[friend]:
            # 排除自己和已经是好友的人
            if friend_of_friend != person and friend_of_friend not in my_friends:
                # 统计共同好友数
                recommendations[friend_of_friend] = recommendations.get(friend_of_friend, 0) + 1
    
    # 按共同好友数排序
    sorted_recommendations = sorted(recommendations.items(), 
                                   key=lambda x: x[1], 
                                   reverse=True)
    
    return sorted_recommendations[:max_recommendations]

def friend_count_ranking(friendships):
    """
    统计好友数量排名
    
    Returns:
        list: [(人名, 好友数), ...] 按好友数降序
    """
    counts = {person: len(friends) for person, friends in friendships.items()}
    return sorted(counts.items(), key=lambda x: x[1], reverse=True)

def degrees_of_separation(friendships, person1, person2, max_depth=6):
    """
    计算两人之间的分隔度数(六度分隔理论)
    使用 BFS 算法
    
    Returns:
        int: 分隔度数(-1 表示不连通)
    """
    if person1 not in friendships or person2 not in friendships:
        return -1
    
    if person1 == person2:
        return 0
    
    # BFS
    from collections import deque
    
    queue = deque([(person1, 0)])  # (当前人, 距离)
    visited = {person1}
    
    while queue:
        current, distance = queue.popleft()
        
        if distance >= max_depth:
            break
        
        for friend in friendships.get(current, set()):
            if friend == person2:
                return distance + 1
            
            if friend not in visited:
                visited.add(friend)
                queue.append((friend, distance + 1))
    
    return -1  # 不连通

# 测试
friendships = {
    "Alice": {"Bob", "Carol", "David"},
    "Bob": {"Alice", "David", "Emma"},
    "Carol": {"Alice", "David"},
    "David": {"Alice", "Bob", "Carol", "Emma", "Frank"},
    "Emma": {"Bob", "David"},
    "Frank": {"David"}
}

# 共同好友
common = common_friends(friendships, "Alice", "Bob")
print(f"Alice 和 Bob 的共同好友: {common}")

# 推荐好友
recommendations = friend_recommendations(friendships, "Alice")
print(f"\n给 Alice 的好友推荐:")
for person, count in recommendations:
    print(f"  {person} (有 {count} 个共同好友)")

# 好友数排名
ranking = friend_count_ranking(friendships)
print(f"\n好友数排名:")
for person, count in ranking:
    print(f"  {person}: {count} 个好友")

# 分隔度数
degrees = degrees_of_separation(friendships, "Alice", "Frank")
print(f"\nAlice 和 Frank 之间的分隔度数: {degrees}")

练习 9: 调查问卷逻辑一致性检查

题目描述: 检查问卷回答的逻辑一致性。例如:

  • 如果回答"收入>10万",则不能回答"经济困难"
  • 如果回答"无子女",则不能回答"子女教育支出"

要求

  1. 定义逻辑规则
  2. 批量检查数据
  3. 生成详细的错误报告
参考答案
python
class ConsistencyChecker:
    """问卷逻辑一致性检查器"""
    
    def __init__(self):
        """初始化规则"""
        self.rules = []
        self.error_messages = {}
    
    def add_rule(self, rule_func, error_message):
        """
        添加检查规则
        
        Parameters:
            rule_func (callable): 规则函数,返回 True 表示通过
            error_message (str): 错误信息
        """
        self.rules.append((rule_func, error_message))
    
    def check_record(self, record):
        """
        检查单条记录
        
        Returns:
            tuple: (是否通过, 错误列表)
        """
        errors = []
        
        for rule_func, error_message in self.rules:
            if not rule_func(record):
                errors.append(error_message)
        
        return len(errors) == 0, errors
    
    def check_batch(self, records):
        """
        批量检查
        
        Returns:
            dict: 检查结果统计
        """
        results = {
            'total': len(records),
            'passed': 0,
            'failed': 0,
            'error_details': []
        }
        
        for i, record in enumerate(records):
            passed, errors = self.check_record(record)
            
            if passed:
                results['passed'] += 1
            else:
                results['failed'] += 1
                results['error_details'].append({
                    'index': i,
                    'id': record.get('id', 'Unknown'),
                    'errors': errors
                })
        
        return results

# 定义具体的检查规则
def create_income_survey_checker():
    """创建收入调查的一致性检查器"""
    checker = ConsistencyChecker()
    
    # 规则 1: 高收入者不应该经济困难
    checker.add_rule(
        lambda r: not (r.get('income', 0) > 100000 and r.get('economic_difficulty', False)),
        "高收入(>10万)但声称经济困难"
    )
    
    # 规则 2: 无子女不应有子女教育支出
    checker.add_rule(
        lambda r: not (r.get('num_children', 0) == 0 and r.get('education_expense', 0) > 0),
        "无子女但有教育支出"
    )
    
    # 规则 3: 未婚不应有配偶收入
    checker.add_rule(
        lambda r: not (not r.get('married', False) and r.get('spouse_income') is not None),
        "未婚但填写了配偶收入"
    )
    
    # 规则 4: 退休者不应有工作收入
    checker.add_rule(
        lambda r: not (r.get('retired', False) and r.get('work_income', 0) > 0),
        "已退休但有工作收入"
    )
    
    # 规则 5: 年龄与教育年限不匹配
    checker.add_rule(
        lambda r: not (r.get('age', 0) < r.get('education_years', 0) + 6),
        "年龄小于教育年限+6岁(不合理)"
    )
    
    return checker

# 测试
survey_data = [
    {
        "id": 1001,
        "income": 120000,
        "economic_difficulty": True,  # 错误!
        "num_children": 0,
        "education_expense": 0,
        "married": True,
        "spouse_income": 50000
    },
    {
        "id": 1002,
        "income": 80000,
        "economic_difficulty": False,
        "num_children": 0,
        "education_expense": 5000,  # 错误!
        "married": False,
        "spouse_income": None
    },
    {
        "id": 1003,
        "income": 60000,
        "economic_difficulty": False,
        "num_children": 2,
        "education_expense": 10000,
        "married": True,
        "spouse_income": 55000
    },
    {
        "id": 1004,
        "age": 25,
        "education_years": 22,  # 错误!
        "income": 50000,
        "married": False
    }
]

# 创建检查器
checker = create_income_survey_checker()

# 批量检查
results = checker.check_batch(survey_data)

print(f"检查结果:")
print(f"  总记录数: {results['total']}")
print(f"  通过: {results['passed']}")
print(f"  失败: {results['failed']}")

if results['failed'] > 0:
    print(f"\n错误详情:")
    for error in results['error_details']:
        print(f"\n  记录 ID {error['id']} (索引 {error['index']}):")
        for err_msg in error['errors']:
            print(f"    - {err_msg}")

练习 10: 面板数据处理 - 个体-时间索引

题目描述: 处理面板数据(Panel Data),创建双索引(个体 × 时间)的数据结构。

要求

  1. 创建 (个体 ID, 年份) 的双索引
  2. 计算个体的时间序列统计(增长率、累计值)
  3. 检测面板平衡性(是否所有个体都有所有年份的数据)

输入数据

python
panel_data = [
    {"id": 1001, "year": 2020, "income": 50000},
    {"id": 1001, "year": 2021, "income": 55000},
    {"id": 1001, "year": 2022, "income": 60000},
    {"id": 1002, "year": 2020, "income": 60000},
    {"id": 1002, "year": 2021, "income": 65000},
    # 1002 缺少 2022 年数据(不平衡面板)
]
参考答案
python
class PanelData:
    """面板数据处理类"""
    
    def __init__(self, data, id_var="id", time_var="year"):
        """
        初始化面板数据
        
        Parameters:
            data (list): 原始数据
            id_var (str): 个体 ID 字段
            time_var (str): 时间字段
        """
        self.id_var = id_var
        self.time_var = time_var
        self.data = data
        
        # 创建双索引字典 {(id, year): record}
        self.index = {}
        for record in data:
            key = (record[id_var], record[time_var])
            self.index[key] = record
        
        # 提取所有个体和时间点
        self.ids = sorted(set(r[id_var] for r in data))
        self.times = sorted(set(r[time_var] for r in data))
    
    def get(self, id_value, time_value):
        """获取特定 (id, time) 的记录"""
        return self.index.get((id_value, time_value))
    
    def is_balanced(self):
        """
        检查是否为平衡面板
        
        Returns:
            tuple: (是否平衡, 缺失记录列表)
        """
        expected_count = len(self.ids) * len(self.times)
        actual_count = len(self.index)
        
        if expected_count == actual_count:
            return True, []
        
        # 找出缺失的记录
        missing = []
        for id_val in self.ids:
            for time_val in self.times:
                if (id_val, time_val) not in self.index:
                    missing.append((id_val, time_val))
        
        return False, missing
    
    def calculate_growth_rate(self, value_var):
        """
        计算个体的增长率
        
        Parameters:
            value_var (str): 值变量(如 income)
        
        Returns:
            list: 包含增长率的新数据
        """
        result = []
        
        for id_val in self.ids:
            # 获取该个体的所有时间点数据
            id_data = [(t, self.get(id_val, t)) for t in self.times]
            id_data = [(t, r) for t, r in id_data if r is not None]
            id_data.sort(key=lambda x: x[0])
            
            # 计算增长率
            for i, (time_val, record) in enumerate(id_data):
                new_record = record.copy()
                
                if i > 0:
                    prev_time, prev_record = id_data[i - 1]
                    prev_value = prev_record[value_var]
                    curr_value = record[value_var]
                    
                    if prev_value > 0:
                        growth_rate = (curr_value - prev_value) / prev_value
                        new_record[f'{value_var}_growth'] = growth_rate
                    else:
                        new_record[f'{value_var}_growth'] = None
                else:
                    new_record[f'{value_var}_growth'] = None
                
                result.append(new_record)
        
        return result
    
    def calculate_cumsum(self, value_var):
        """计算个体的累计值"""
        result = []
        
        for id_val in self.ids:
            id_data = [(t, self.get(id_val, t)) for t in self.times]
            id_data = [(t, r) for t, r in id_data if r is not None]
            id_data.sort(key=lambda x: x[0])
            
            cumsum = 0
            for time_val, record in id_data:
                new_record = record.copy()
                cumsum += record[value_var]
                new_record[f'{value_var}_cumsum'] = cumsum
                result.append(new_record)
        
        return result
    
    def summary(self):
        """面板数据摘要"""
        balanced, missing = self.is_balanced()
        
        return {
            'num_individuals': len(self.ids),
            'num_time_periods': len(self.times),
            'total_observations': len(self.data),
            'expected_observations': len(self.ids) * len(self.times),
            'is_balanced': balanced,
            'missing_count': len(missing),
            'time_range': (min(self.times), max(self.times))
        }

# 测试
panel_data = [
    {"id": 1001, "year": 2020, "income": 50000},
    {"id": 1001, "year": 2021, "income": 55000},
    {"id": 1001, "year": 2022, "income": 60000},
    {"id": 1002, "year": 2020, "income": 60000},
    {"id": 1002, "year": 2021, "income": 65000},
    {"id": 1003, "year": 2020, "income": 70000},
    {"id": 1003, "year": 2021, "income": 75000},
    {"id": 1003, "year": 2022, "income": 80000},
]

# 创建面板数据对象
panel = PanelData(panel_data, id_var="id", time_var="year")

# 摘要信息
summary = panel.summary()
print("面板数据摘要:")
for key, value in summary.items():
    print(f"  {key}: {value}")

# 平衡性检查
balanced, missing = panel.is_balanced()
if not balanced:
    print(f"\n不平衡面板,缺失 {len(missing)} 条记录:")
    for id_val, year in missing[:5]:  # 只显示前 5 个
        print(f"  ID {id_val}, 年份 {year}")

# 计算增长率
data_with_growth = panel.calculate_growth_rate("income")
print("\n带增长率的数据:")
for record in data_with_growth[:6]:  # 只显示前 6 个
    print(f"  {record}")

# 计算累计值
data_with_cumsum = panel.calculate_cumsum("income")
print("\n带累计值的数据:")
for record in data_with_cumsum[:6]:
    print(f"  {record}")

延伸阅读

官方文档

推荐资源

性能优化


下一步

恭喜完成 Module 4 的学习!你已经掌握了:

  • Python 的四种核心数据结构(列表、元组、字典、集合)
  • 如何选择合适的数据结构
  • 10 个综合练习题,涵盖数据处理的各种场景

建议

  1. 重点掌握列表和字典:这是最常用的两种结构
  2. 理解集合的优势:去重和集合运算非常高效
  3. 实践嵌套结构:真实数据往往是嵌套的(列表套字典)

Module 5 中,我们将学习函数和模块,让代码更加模块化和可复用。

Module 9 中,我们将深入学习 Pandas,它整合了所有数据结构的优点!

继续加油!数据结构是数据处理的基石!

基于 MIT 许可证发布。内容版权归作者所有。