小结和复习
掌握 Python 数据结构 —— 列表、字典、元组、集合的完整回顾
本章知识点总结
1. 四种核心数据结构对比
| 特性 | 列表 (List) | 元组 (Tuple) | 字典 (Dict) | 集合 (Set) |
|---|---|---|---|---|
| 符号 | [...] | (...) | {k:v, ...} | {...} |
| 有序 | (3.7+保持插入顺序) | |||
| 可变 | ||||
| 重复 | 键唯一,值可重复 | |||
| 索引方式 | 整数 list[0] | 整数 tuple[0] | 键 dict['key'] | 无索引 |
| 典型用途 | 有序集合 | 不可变记录 | 键值映射 | 唯一值、集合运算 |
2. 列表 (List)
核心特点:有序、可变、允许重复
创建方式:
python
# 方法 1: 直接创建
ages = [25, 30, 35, 40]
# 方法 2: range() 转换
numbers = list(range(10)) # [0, 1, 2, ..., 9]
# 方法 3: 列表推导式
squares = [x**2 for x in range(5)] # [0, 1, 4, 9, 16]常用操作:
python
# 添加元素
ages.append(45) # 末尾添加
ages.insert(0, 20) # 指定位置插入
ages.extend([50, 55]) # 批量添加
# 删除元素
ages.remove(30) # 删除第一个 30
ages.pop() # 删除并返回最后一个
ages.pop(0) # 删除并返回索引 0
del ages[1] # 删除索引 1
ages.clear() # 清空列表
# 查找
index = ages.index(35) # 返回 35 的索引
count = ages.count(30) # 统计 30 出现次数
exists = 30 in ages # 检查是否存在
# 排序
ages.sort() # 原地排序(升序)
ages.sort(reverse=True) # 降序
sorted_ages = sorted(ages) # 返回新列表
ages.reverse() # 反转列表
# 切片
first_three = ages[:3] # 前 3 个
last_two = ages[-2:] # 后 2 个
every_second = ages[::2] # 每隔一个社科应用:
python
# 存储样本 ID
sample_ids = [1001, 1002, 1003, 1004]
# 存储多个年份的数据
years = list(range(2010, 2021)) # [2010, 2011, ..., 2020]
# 筛选有效样本
valid_ages = [age for age in ages if 18 <= age <= 100]3. 元组 (Tuple)
核心特点:有序、不可变、允许重复
创建方式:
python
# 标准创建
coordinates = (10, 20)
# 单元素元组(注意逗号)
single = (42,) # 有逗号
not_tuple = (42) # 这只是整数,不是元组
# 元组解包
x, y = coordinates # x=10, y=20何时使用元组:
- 数据不应被修改(配置参数、常量)
- 作为字典的键(列表不行)
- 函数返回多个值
- 性能要求高(比列表快)
实际应用:
python
# 1. 固定配置
REGRESSION_CONFIG = ("OLS", 0.05, 1000) # 模型类型、显著性、样本量
# 2. 函数返回多个值
def calculate_stats(data):
return (mean(data), std(data), len(data))
mean_val, std_val, n = calculate_stats(incomes)
# 3. 作为字典键
results = {
("Model1", "OLS"): 0.85,
("Model2", "Logit"): 0.78
}
# 4. 数据记录(不可变)
student = (1001, "Alice", 25, "Economics") # ID, 姓名, 年龄, 专业4. 字典 (Dictionary)
核心特点:键值对、无序(3.7+保持插入顺序)、键唯一
创建方式:
python
# 方法 1: 直接创建
student = {"name": "Alice", "age": 25, "major": "Economics"}
# 方法 2: dict() 函数
student = dict(name="Alice", age=25, major="Economics")
# 方法 3: 字典推导式
squares = {x: x**2 for x in range(5)} # {0:0, 1:1, 2:4, 3:9, 4:16}
# 方法 4: 从列表对创建
pairs = [("name", "Alice"), ("age", 25)]
student = dict(pairs)常用操作:
python
# 访问
name = student["name"] # 直接访问(键不存在会报错)
name = student.get("name") # 安全访问
name = student.get("nickname", "Unknown") # 提供默认值
# 修改和添加
student["age"] = 26 # 修改
student["gpa"] = 3.8 # 添加新键
# 删除
del student["age"] # 删除键值对
age = student.pop("age", None) # 删除并返回,提供默认值
# 遍历
for key in student: # 遍历键
print(key, student[key])
for key, value in student.items(): # 遍历键值对
print(key, value)
for value in student.values(): # 遍历值
print(value)
# 检查键是否存在
if "age" in student:
print(student["age"])
# 合并字典
student.update({"gpa": 3.8, "year": 3})社科应用:
python
# 1. 存储个体数据
respondent = {
"id": 1001,
"age": 30,
"income": 75000,
"gender": "Female",
"education": 16
}
# 2. 变量标签映射
var_labels = {
"age": "年龄",
"income": "年收入(元)",
"edu": "受教育年限"
}
# 3. 回归结果
regression_results = {
"coef": 5000.5,
"std_err": 250.3,
"t_value": 19.98,
"p_value": 0.000,
"r_squared": 0.65
}
# 4. 分组统计
income_by_gender = {
"Male": 75000,
"Female": 70000,
"Other": 72500
}5. 集合 (Set)
核心特点:无序、唯一、可变
创建方式:
python
# 方法 1: 直接创建
unique_ids = {1001, 1002, 1003}
# 方法 2: set() 函数(从列表去重)
ids = [1001, 1002, 1003, 1001, 1002]
unique_ids = set(ids) # {1001, 1002, 1003}
# 注意:空集合必须用 set()
empty_set = set() # 空集合
empty_dict = {} # 空字典(不是空集合!)集合运算:
python
group_a = {1, 2, 3, 4, 5}
group_b = {4, 5, 6, 7, 8}
# 并集(所有元素)
union = group_a | group_b # {1, 2, 3, 4, 5, 6, 7, 8}
union = group_a.union(group_b)
# 交集(共同元素)
intersection = group_a & group_b # {4, 5}
intersection = group_a.intersection(group_b)
# 差集(A 有但 B 没有)
difference = group_a - group_b # {1, 2, 3}
difference = group_a.difference(group_b)
# 对称差(只在一个集合中)
sym_diff = group_a ^ group_b # {1, 2, 3, 6, 7, 8}常用操作:
python
# 添加元素
unique_ids.add(1004)
unique_ids.update([1005, 1006])
# 删除元素
unique_ids.remove(1001) # 不存在会报错
unique_ids.discard(1001) # 不存在不报错
# 成员检测(非常快!)
if 1001 in unique_ids:
print("存在")社科应用:
python
# 1. 数据去重
all_respondent_ids = [1001, 1002, 1003, 1001, 1004, 1002]
unique_ids = set(all_respondent_ids)
# 2. 样本匹配(找交集)
treatment_group = {1001, 1002, 1003, 1004}
control_group = {1003, 1004, 1005, 1006}
matched_sample = treatment_group & control_group # {1003, 1004}
# 3. 找出只在处理组的样本
treatment_only = treatment_group - control_group # {1001, 1002}
# 4. 快速检查 ID 是否存在(比列表快)
valid_ids = set(range(1000, 2000))
if respondent_id in valid_ids:
print("有效 ID")选择指南速查表
| 需求 | 推荐数据结构 | 原因 |
|---|---|---|
| 存储有序的成绩 | List | 需要保持顺序 |
| 函数返回多个统计量 | Tuple | 不可变、轻量 |
| 存储学生 ID → 信息 | Dict | 快速查找 |
| 删除重复的 ID | Set | 自动去重 |
| 需要频繁修改的序列 | List | 可变 |
| 配置参数(不应修改) | Tuple | 不可变 |
| 变量名映射 | Dict | 键值对应 |
| 找两组样本的交集 | Set | 集合运算 |
Python vs Stata vs R 对比
列表操作
| 操作 | Python | Stata | R |
|---|---|---|---|
| 创建序列 | list(range(10)) | gen id = _n | 1:10 |
| 添加元素 | list.append(x) | replace | c(list, x) |
| 删除元素 | list.remove(x) | drop if | list[-index] |
| 切片 | list[1:3] | in 1/3 | list[1:3] |
字典/映射
| 操作 | Python | Stata | R |
|---|---|---|---|
| 创建映射 | {"a": 1, "b": 2} | Label values | list(a=1, b=2) |
| 访问值 | dict["a"] | N/A | list$a |
| 遍历 | for k, v in dict.items() | N/A | lapply(list, ...) |
集合操作
| 操作 | Python | Stata | R |
|---|---|---|---|
| 去重 | set(list) | duplicates drop | unique(vector) |
| 交集 | set_a & set_b | merge + keep if _merge==3 | intersect(a, b) |
| 并集 | `set_a | set_b` | append |
| 差集 | set_a - set_b | merge + keep if _merge==1 | setdiff(a, b) |
️ 易错点和最佳实践
易错点 1: 列表索引从 0 开始
python
# 常见错误(以为从 1 开始)
ages = [25, 30, 35, 40]
first = ages[1] # 这是第 2 个元素!实际是 30
# 正确
first = ages[0] # 25(第 1 个)
last = ages[-1] # 40(最后一个)易错点 2: 修改列表的副作用
python
# 错误(浅拷贝陷阱)
original = [1, 2, 3]
copy = original # 这不是拷贝,是引用!
copy.append(4)
print(original) # [1, 2, 3, 4](原列表也被修改了!)
# 正确(深拷贝)
copy = original.copy() # 方法 1
copy = original[:] # 方法 2
copy = list(original) # 方法 3
import copy
deep_copy = copy.deepcopy(nested_list) # 嵌套列表用这个易错点 3: 单元素元组的逗号
python
# 错误(不是元组)
not_tuple = (42)
print(type(not_tuple)) # <class 'int'>
# 正确(必须有逗号)
is_tuple = (42,)
print(type(is_tuple)) # <class 'tuple'>易错点 4: 字典键不存在
python
student = {"name": "Alice", "age": 25}
# 错误(键不存在会报错)
gpa = student["gpa"] # KeyError: 'gpa'
# 正确(安全访问)
gpa = student.get("gpa", 0.0) # 不存在返回 0.0易错点 5: 集合是无序的
python
# 错误(期望保持顺序)
ids = {1003, 1001, 1002}
print(ids) # {1001, 1002, 1003}(可能是任意顺序!)
# 正确(需要顺序用列表)
ids = [1003, 1001, 1002] # 保持插入顺序
unique_ids = []
seen = set()
for id in ids:
if id not in seen:
unique_ids.append(id)
seen.add(id)最佳实践 1: 使用推导式
python
# 不够优雅
squares = []
for x in range(10):
squares.append(x ** 2)
# 更优雅(列表推导式)
squares = [x ** 2 for x in range(10)]
# 字典推导式
id_to_age = {id: age for id, age in zip(ids, ages)}
# 集合推导式
unique_squares = {x ** 2 for x in range(-5, 6)}最佳实践 2: 善用 get() 和 setdefault()
python
# 统计词频
word_count = {}
# 不够优雅
for word in words:
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
# 更优雅
for word in words:
word_count[word] = word_count.get(word, 0) + 1
# 或使用 defaultdict
from collections import defaultdict
word_count = defaultdict(int)
for word in words:
word_count[word] += 1最佳实践 3: 合理选择数据结构
python
# 场景:需要快速检查 ID 是否存在
# 使用列表(慢,O(n))
valid_ids = [1001, 1002, 1003, ..., 2000] # 1000 个 ID
if respondent_id in valid_ids: # 需要遍历整个列表
pass
# 使用集合(快,O(1))
valid_ids = set(range(1001, 2001)) # 集合
if respondent_id in valid_ids: # 立即查找
pass综合练习题
基础巩固题(1-3题)
练习 1: 问卷数据去重与排序
题目描述: 处理问卷调查中的重复样本,并按 ID 排序。
要求:
- 删除重复的样本 ID
- 按 ID 升序排序
- 返回去重后的 ID 列表和删除的重复数量
输入输出示例:
python
sample_ids = [1003, 1001, 1004, 1001, 1002, 1003, 1005, 1002]
unique_ids, duplicate_count = remove_duplicates(sample_ids)
print(unique_ids) # [1001, 1002, 1003, 1004, 1005]
print(duplicate_count) # 3提示
- 使用
set()去重 - 使用
sorted()排序 - 计算原始数量 - 去重后数量
参考答案
python
def remove_duplicates(sample_ids):
"""
去重并排序样本 ID
Parameters:
sample_ids (list): 样本 ID 列表(可能有重复)
Returns:
tuple: (去重后的有序列表, 重复数量)
"""
# 方法 1: 使用集合去重
unique_ids = sorted(set(sample_ids))
duplicate_count = len(sample_ids) - len(unique_ids)
return unique_ids, duplicate_count
# 方法 2: 保持原始顺序去重
def remove_duplicates_keep_order(sample_ids):
"""保持第一次出现的顺序"""
seen = set()
unique_ids = []
for id in sample_ids:
if id not in seen:
unique_ids.append(id)
seen.add(id)
duplicate_count = len(sample_ids) - len(unique_ids)
return unique_ids, duplicate_count
# 测试
sample_ids = [1003, 1001, 1004, 1001, 1002, 1003, 1005, 1002]
# 方法 1: 排序后去重
unique_ids, dup_count = remove_duplicates(sample_ids)
print(f"去重后(排序): {unique_ids}")
print(f"删除了 {dup_count} 个重复样本")
# 方法 2: 保持顺序去重
unique_ids2, dup_count2 = remove_duplicates_keep_order(sample_ids)
print(f"去重后(保持顺序): {unique_ids2}")练习 2: 词频统计
题目描述: 统计文本中每个单词出现的次数。
要求:
- 不区分大小写
- 返回字典,键为单词,值为出现次数
- 按频率降序排列(返回列表)
输入输出示例:
python
text = "Python is great. Python is powerful. Python is popular."
word_freq = count_words(text)
print(word_freq)
# {'python': 3, 'is': 3, 'great': 1, 'powerful': 1, 'popular': 1}
sorted_words = sort_by_frequency(word_freq)
print(sorted_words)
# [('python', 3), ('is', 3), ('great', 1), ('powerful', 1), ('popular', 1)]参考答案
python
def count_words(text):
"""
统计单词频率
Parameters:
text (str): 文本
Returns:
dict: 单词频率字典
"""
# 移除标点,转小写,分词
import string
# 移除标点符号
text = text.translate(str.maketrans('', '', string.punctuation))
# 转小写并分词
words = text.lower().split()
# 统计词频
word_freq = {}
for word in words:
word_freq[word] = word_freq.get(word, 0) + 1
return word_freq
def sort_by_frequency(word_freq):
"""
按频率降序排序
Parameters:
word_freq (dict): 单词频率字典
Returns:
list: [(单词, 频率), ...] 按频率降序
"""
# sorted() 返回列表,key 指定排序依据
return sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
# 使用 Counter(更简单)
from collections import Counter
def count_words_v2(text):
"""使用 Counter 实现"""
import string
text = text.translate(str.maketrans('', '', string.punctuation))
words = text.lower().split()
return dict(Counter(words))
def top_n_words(text, n=10):
"""返回出现最多的 n 个单词"""
from collections import Counter
import string
text = text.translate(str.maketrans('', '', string.punctuation))
words = text.lower().split()
counter = Counter(words)
return counter.most_common(n)
# 测试
text = "Python is great. Python is powerful. Python is popular. Python Python."
word_freq = count_words(text)
print("词频统计:", word_freq)
sorted_words = sort_by_frequency(word_freq)
print("\n按频率排序:")
for word, freq in sorted_words:
print(f" {word}: {freq}")
# 使用 Counter
print("\nTop 3 单词:")
print(top_n_words(text, n=3))练习 3: 学生成绩管理
题目描述: 创建一个学生成绩管理系统,使用字典存储学生信息。
要求:
- 添加学生成绩
- 查询学生成绩
- 计算班级平均分
- 找出最高分和最低分的学生
输入输出示例:
python
grade_book = GradeBook()
grade_book.add_student("Alice", 95)
grade_book.add_student("Bob", 85)
grade_book.add_student("Carol", 90)
print(grade_book.get_grade("Alice")) # 95
print(grade_book.class_average()) # 90.0
print(grade_book.top_student()) # ("Alice", 95)
print(grade_book.bottom_student()) # ("Bob", 85)参考答案
python
class GradeBook:
"""学生成绩管理系统"""
def __init__(self):
"""初始化空成绩簿"""
self.grades = {} # {学生姓名: 成绩}
def add_student(self, name, grade):
"""
添加学生成绩
Parameters:
name (str): 学生姓名
grade (float): 成绩
"""
self.grades[name] = grade
def get_grade(self, name):
"""
查询学生成绩
Parameters:
name (str): 学生姓名
Returns:
float or None: 成绩(不存在返回 None)
"""
return self.grades.get(name)
def class_average(self):
"""
计算班级平均分
Returns:
float: 平均分(无学生返回 0)
"""
if not self.grades:
return 0.0
return sum(self.grades.values()) / len(self.grades)
def top_student(self):
"""
找出最高分学生
Returns:
tuple: (姓名, 成绩)
"""
if not self.grades:
return None
top_name = max(self.grades, key=self.grades.get)
return (top_name, self.grades[top_name])
def bottom_student(self):
"""找出最低分学生"""
if not self.grades:
return None
bottom_name = min(self.grades, key=self.grades.get)
return (bottom_name, self.grades[bottom_name])
def students_above_average(self):
"""返回高于平均分的学生"""
avg = self.class_average()
return {name: grade for name, grade in self.grades.items()
if grade > avg}
def grade_distribution(self):
"""
成绩分布统计
Returns:
dict: {'A': count, 'B': count, ...}
"""
distribution = {'A': 0, 'B': 0, 'C': 0, 'D': 0, 'F': 0}
for grade in self.grades.values():
if grade >= 90:
distribution['A'] += 1
elif grade >= 80:
distribution['B'] += 1
elif grade >= 70:
distribution['C'] += 1
elif grade >= 60:
distribution['D'] += 1
else:
distribution['F'] += 1
return distribution
def __str__(self):
"""字符串表示"""
return f"GradeBook with {len(self.grades)} students"
# 测试
grade_book = GradeBook()
grade_book.add_student("Alice", 95)
grade_book.add_student("Bob", 85)
grade_book.add_student("Carol", 90)
grade_book.add_student("David", 75)
grade_book.add_student("Emma", 92)
print(f"Alice 的成绩: {grade_book.get_grade('Alice')}")
print(f"班级平均分: {grade_book.class_average():.1f}")
print(f"最高分: {grade_book.top_student()}")
print(f"最低分: {grade_book.bottom_student()}")
print(f"\n高于平均分的学生:")
for name, grade in grade_book.students_above_average().items():
print(f" {name}: {grade}")
print(f"\n成绩分布: {grade_book.grade_distribution()}")综合应用题(4-7题)
练习 4: 问卷数据合并
题目描述: 合并两份问卷数据(基础信息和补充信息),根据受访者 ID 进行匹配。
要求:
- 按 ID 合并两个字典列表
- 处理只在一份问卷中的样本
- 返回合并后的完整数据
输入输出示例:
python
basic_info = [
{"id": 1001, "name": "Alice", "age": 25},
{"id": 1002, "name": "Bob", "age": 30},
{"id": 1003, "name": "Carol", "age": 35}
]
supplement = [
{"id": 1001, "income": 50000, "education": 16},
{"id": 1002, "income": 75000, "education": 18},
{"id": 1004, "income": 60000, "education": 14} # 1004 不在基础信息中
]
merged = merge_surveys(basic_info, supplement, on="id")
# [
# {"id": 1001, "name": "Alice", "age": 25, "income": 50000, "education": 16},
# {"id": 1002, "name": "Bob", "age": 30, "income": 75000, "education": 18},
# {"id": 1003, "name": "Carol", "age": 35, "income": None, "education": None},
# {"id": 1004, "name": None, "age": None, "income": 60000, "education": 14}
# ]参考答案
python
def merge_surveys(basic_info, supplement, on="id", how="outer"):
"""
合并两份问卷数据
Parameters:
basic_info (list): 基础信息列表
supplement (list): 补充信息列表
on (str): 用于匹配的键
how (str): 合并方式 ("inner", "left", "right", "outer")
Returns:
list: 合并后的数据
"""
# 转换为字典以便快速查找
basic_dict = {record[on]: record for record in basic_info}
supp_dict = {record[on]: record for record in supplement}
# 根据合并方式确定要保留的 ID
if how == "inner":
ids = set(basic_dict.keys()) & set(supp_dict.keys())
elif how == "left":
ids = set(basic_dict.keys())
elif how == "right":
ids = set(supp_dict.keys())
else: # outer
ids = set(basic_dict.keys()) | set(supp_dict.keys())
# 合并
merged = []
for id in sorted(ids):
record = {on: id}
# 从基础信息获取字段
if id in basic_dict:
record.update(basic_dict[id])
else:
# 添加 None 占位
basic_keys = set()
for r in basic_info:
basic_keys.update(r.keys())
for key in basic_keys:
if key != on and key not in record:
record[key] = None
# 从补充信息获取字段
if id in supp_dict:
supp_data = supp_dict[id].copy()
supp_data.pop(on, None) # 移除重复的 ID 字段
record.update(supp_data)
else:
# 添加 None 占位
supp_keys = set()
for r in supplement:
supp_keys.update(r.keys())
for key in supp_keys:
if key != on and key not in record:
record[key] = None
merged.append(record)
return merged
def merge_statistics(basic_info, supplement, on="id"):
"""
合并统计信息
Returns:
dict: 合并统计
"""
basic_ids = {record[on] for record in basic_info}
supp_ids = {record[on] for record in supplement}
return {
"basic_only": len(basic_ids - supp_ids),
"supplement_only": len(supp_ids - basic_ids),
"both": len(basic_ids & supp_ids),
"total": len(basic_ids | supp_ids)
}
# 测试
basic_info = [
{"id": 1001, "name": "Alice", "age": 25},
{"id": 1002, "name": "Bob", "age": 30},
{"id": 1003, "name": "Carol", "age": 35}
]
supplement = [
{"id": 1001, "income": 50000, "education": 16},
{"id": 1002, "income": 75000, "education": 18},
{"id": 1004, "income": 60000, "education": 14}
]
# 外连接(保留所有)
merged = merge_surveys(basic_info, supplement, on="id", how="outer")
print("外连接(保留所有):")
for record in merged:
print(f" {record}")
# 内连接(只保留匹配的)
merged_inner = merge_surveys(basic_info, supplement, on="id", how="inner")
print(f"\n内连接(只保留匹配的): {len(merged_inner)} 条记录")
# 统计信息
stats = merge_statistics(basic_info, supplement, on="id")
print(f"\n合并统计:")
print(f" 只在基础信息: {stats['basic_only']}")
print(f" 只在补充信息: {stats['supplement_only']}")
print(f" 两者都有: {stats['both']}")
print(f" 总计: {stats['total']}")练习 5: 分组聚合统计
题目描述: 对问卷数据按性别分组,计算各组的统计量。
要求:
- 按性别分组
- 计算各组的人数、平均收入、平均年龄
- 返回字典格式的结果
输入输出示例:
python
data = [
{"id": 1001, "gender": "Male", "age": 25, "income": 50000},
{"id": 1002, "gender": "Female", "age": 30, "income": 60000},
{"id": 1003, "gender": "Male", "age": 35, "income": 75000},
{"id": 1004, "gender": "Female", "age": 28, "income": 55000},
]
result = group_statistics(data, by="gender")
# {
# 'Male': {'count': 2, 'avg_age': 30.0, 'avg_income': 62500.0},
# 'Female': {'count': 2, 'avg_age': 29.0, 'avg_income': 57500.0}
# }参考答案
python
def group_statistics(data, by="gender", agg_fields=None):
"""
分组聚合统计
Parameters:
data (list): 数据列表
by (str): 分组字段
agg_fields (list): 要聚合的数值字段(None 表示所有数值字段)
Returns:
dict: 分组统计结果
"""
# 自动检测数值字段
if agg_fields is None:
agg_fields = []
if data:
for key, value in data[0].items():
if isinstance(value, (int, float)) and key != by:
agg_fields.append(key)
# 分组收集数据
groups = {}
for record in data:
group_key = record[by]
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(record)
# 计算统计量
result = {}
for group_key, group_records in groups.items():
stats = {'count': len(group_records)}
for field in agg_fields:
values = [r[field] for r in group_records if field in r]
if values:
stats[f'avg_{field}'] = sum(values) / len(values)
stats[f'min_{field}'] = min(values)
stats[f'max_{field}'] = max(values)
result[group_key] = stats
return result
def pivot_table(data, index, columns, values, aggfunc='mean'):
"""
创建透视表
Parameters:
data (list): 数据
index (str): 行分组字段
columns (str): 列分组字段
values (str): 值字段
aggfunc (str): 聚合函数 ('mean', 'sum', 'count')
Returns:
dict: 嵌套字典 {index_value: {column_value: aggregated_value}}
"""
from collections import defaultdict
# 收集数据
groups = defaultdict(lambda: defaultdict(list))
for record in data:
index_val = record[index]
column_val = record[columns]
value = record[values]
groups[index_val][column_val].append(value)
# 聚合
result = {}
for index_val, column_dict in groups.items():
result[index_val] = {}
for column_val, values_list in column_dict.items():
if aggfunc == 'mean':
result[index_val][column_val] = sum(values_list) / len(values_list)
elif aggfunc == 'sum':
result[index_val][column_val] = sum(values_list)
elif aggfunc == 'count':
result[index_val][column_val] = len(values_list)
return result
# 测试
data = [
{"id": 1001, "gender": "Male", "age": 25, "income": 50000, "education": 16},
{"id": 1002, "gender": "Female", "age": 30, "income": 60000, "education": 18},
{"id": 1003, "gender": "Male", "age": 35, "income": 75000, "education": 16},
{"id": 1004, "gender": "Female", "age": 28, "income": 55000, "education": 14},
{"id": 1005, "gender": "Male", "age": 40, "income": 80000, "education": 20},
]
# 分组统计
result = group_statistics(data, by="gender")
print("按性别分组统计:")
for gender, stats in result.items():
print(f"\n{gender}:")
for key, value in stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
# 透视表:性别 × 教育水平 → 平均收入
print("\n透视表(性别 × 教育水平 → 平均收入):")
pivot = pivot_table(data, index="gender", columns="education",
values="income", aggfunc="mean")
for gender, edu_dict in pivot.items():
print(f"{gender}: {edu_dict}")练习 6: 数据转换 - 宽格式转长格式
题目描述: 将宽格式的数据转换为长格式(类似 Stata 的 reshape long)。
宽格式:
{"id": 1001, "income_2020": 50000, "income_2021": 55000, "income_2022": 60000}长格式:
[
{"id": 1001, "year": 2020, "income": 50000},
{"id": 1001, "year": 2021, "income": 55000},
{"id": 1001, "year": 2022, "income": 60000}
]要求:
- 识别需要转换的列(income_YYYY)
- 提取年份
- 生成长格式数据
参考答案
python
def wide_to_long(data, id_var, stub, sep="_"):
"""
宽格式转长格式
Parameters:
data (list): 宽格式数据
id_var (str): ID 字段
stub (str): 变量前缀(如 "income")
sep (str): 分隔符
Returns:
list: 长格式数据
"""
long_data = []
for record in data:
# 提取固定字段
id_value = record[id_var]
# 查找所有匹配的列
for key, value in record.items():
if key.startswith(stub + sep):
# 提取后缀(如年份)
suffix = key.split(sep)[1]
# 创建长格式记录
long_record = {
id_var: id_value,
'year': int(suffix),
stub: value
}
long_data.append(long_record)
return sorted(long_data, key=lambda x: (x[id_var], x['year']))
def long_to_wide(data, id_var, time_var, value_var):
"""
长格式转宽格式
Parameters:
data (list): 长格式数据
id_var (str): ID 字段
time_var (str): 时间字段(如 year)
value_var (str): 值字段(如 income)
Returns:
list: 宽格式数据
"""
from collections import defaultdict
# 收集数据
wide_dict = defaultdict(dict)
for record in data:
id_val = record[id_var]
time_val = record[time_var]
value = record[value_var]
wide_dict[id_val][f"{value_var}_{time_val}"] = value
# 转换为列表
wide_data = []
for id_val, time_dict in wide_dict.items():
wide_record = {id_var: id_val}
wide_record.update(time_dict)
wide_data.append(wide_record)
return sorted(wide_data, key=lambda x: x[id_var])
# 测试
# 宽格式数据
wide_data = [
{"id": 1001, "name": "Alice", "income_2020": 50000, "income_2021": 55000, "income_2022": 60000},
{"id": 1002, "name": "Bob", "income_2020": 60000, "income_2021": 65000, "income_2022": 70000}
]
# 转为长格式
long_data = wide_to_long(wide_data, id_var="id", stub="income", sep="_")
print("长格式数据:")
for record in long_data:
print(f" {record}")
# 转回宽格式
wide_again = long_to_wide(long_data, id_var="id", time_var="year", value_var="income")
print("\n转回宽格式:")
for record in wide_again:
print(f" {record}")练习 7: 嵌套字典数据提取
题目描述: 处理嵌套的 JSON 数据,提取特定字段。
输入数据(模拟 API 返回):
python
api_response = {
"data": {
"survey": {
"id": "2024_income_survey",
"respondents": [
{
"personal": {"id": 1001, "name": "Alice", "age": 25},
"economic": {"income": 50000, "employed": True}
},
{
"personal": {"id": 1002, "name": "Bob", "age": 30},
"economic": {"income": 75000, "employed": True}
}
]
}
}
}要求: 提取为扁平化的列表:
python
[
{"id": 1001, "name": "Alice", "age": 25, "income": 50000, "employed": True},
{"id": 1002, "name": "Bob", "age": 30, "income": 75000, "employed": True}
]参考答案
python
def flatten_nested_data(api_response):
"""
扁平化嵌套数据
Parameters:
api_response (dict): 嵌套的 API 响应
Returns:
list: 扁平化的数据列表
"""
respondents = api_response["data"]["survey"]["respondents"]
flat_data = []
for resp in respondents:
flat_record = {}
# 合并 personal 和 economic 字段
if "personal" in resp:
flat_record.update(resp["personal"])
if "economic" in resp:
flat_record.update(resp["economic"])
flat_data.append(flat_record)
return flat_data
def extract_nested(data, path):
"""
根据路径提取嵌套数据
Parameters:
data (dict): 嵌套字典
path (str): 路径,用点分隔(如 "data.survey.respondents")
Returns:
提取的数据
"""
keys = path.split('.')
result = data
for key in keys:
if isinstance(result, dict):
result = result.get(key)
else:
return None
if result is None:
return None
return result
def flatten_all_levels(nested_dict, parent_key='', sep='_'):
"""
递归扁平化所有层级
Parameters:
nested_dict (dict): 嵌套字典
parent_key (str): 父键名
sep (str): 分隔符
Returns:
dict: 扁平化的字典
"""
items = []
for k, v in nested_dict.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_all_levels(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
# 测试
api_response = {
"data": {
"survey": {
"id": "2024_income_survey",
"respondents": [
{
"personal": {"id": 1001, "name": "Alice", "age": 25},
"economic": {"income": 50000, "employed": True}
},
{
"personal": {"id": 1002, "name": "Bob", "age": 30},
"economic": {"income": 75000, "employed": True}
}
]
}
}
}
# 扁平化
flat_data = flatten_nested_data(api_response)
print("扁平化数据:")
for record in flat_data:
print(f" {record}")
# 路径提取
respondents = extract_nested(api_response, "data.survey.respondents")
print(f"\n提取的受访者数量: {len(respondents)}")
# 完全扁平化
nested = {"a": {"b": {"c": 1}}, "d": 2}
flat = flatten_all_levels(nested)
print(f"\n完全扁平化: {flat}")挑战题(8-10题)
练习 8: 社交网络分析 - 好友关系
题目描述: 分析社交网络中的好友关系,找出共同好友、推荐好友等。
数据结构:
python
friendships = {
"Alice": {"Bob", "Carol", "David"},
"Bob": {"Alice", "David", "Emma"},
"Carol": {"Alice", "David"},
"David": {"Alice", "Bob", "Carol", "Emma"},
"Emma": {"Bob", "David"}
}要求:
- 找出两人的共同好友
- 推荐好友(好友的好友,但不是自己的好友)
- 统计每个人的好友数量排名
参考答案
python
def common_friends(friendships, person1, person2):
"""
找出两人的共同好友
Parameters:
friendships (dict): 好友关系字典 {人名: 好友集合}
person1, person2 (str): 两个人的名字
Returns:
set: 共同好友集合
"""
if person1 not in friendships or person2 not in friendships:
return set()
return friendships[person1] & friendships[person2]
def friend_recommendations(friendships, person, max_recommendations=5):
"""
推荐好友(好友的好友,但不是自己的好友)
Parameters:
friendships (dict): 好友关系字典
person (str): 要推荐的人
max_recommendations (int): 最多推荐数量
Returns:
list: [(推荐人名, 共同好友数), ...] 按共同好友数降序
"""
if person not in friendships:
return []
my_friends = friendships[person]
recommendations = {} # {候选人: 共同好友数}
# 遍历我的好友
for friend in my_friends:
if friend not in friendships:
continue
# 遍历好友的好友
for friend_of_friend in friendships[friend]:
# 排除自己和已经是好友的人
if friend_of_friend != person and friend_of_friend not in my_friends:
# 统计共同好友数
recommendations[friend_of_friend] = recommendations.get(friend_of_friend, 0) + 1
# 按共同好友数排序
sorted_recommendations = sorted(recommendations.items(),
key=lambda x: x[1],
reverse=True)
return sorted_recommendations[:max_recommendations]
def friend_count_ranking(friendships):
"""
统计好友数量排名
Returns:
list: [(人名, 好友数), ...] 按好友数降序
"""
counts = {person: len(friends) for person, friends in friendships.items()}
return sorted(counts.items(), key=lambda x: x[1], reverse=True)
def degrees_of_separation(friendships, person1, person2, max_depth=6):
"""
计算两人之间的分隔度数(六度分隔理论)
使用 BFS 算法
Returns:
int: 分隔度数(-1 表示不连通)
"""
if person1 not in friendships or person2 not in friendships:
return -1
if person1 == person2:
return 0
# BFS
from collections import deque
queue = deque([(person1, 0)]) # (当前人, 距离)
visited = {person1}
while queue:
current, distance = queue.popleft()
if distance >= max_depth:
break
for friend in friendships.get(current, set()):
if friend == person2:
return distance + 1
if friend not in visited:
visited.add(friend)
queue.append((friend, distance + 1))
return -1 # 不连通
# 测试
friendships = {
"Alice": {"Bob", "Carol", "David"},
"Bob": {"Alice", "David", "Emma"},
"Carol": {"Alice", "David"},
"David": {"Alice", "Bob", "Carol", "Emma", "Frank"},
"Emma": {"Bob", "David"},
"Frank": {"David"}
}
# 共同好友
common = common_friends(friendships, "Alice", "Bob")
print(f"Alice 和 Bob 的共同好友: {common}")
# 推荐好友
recommendations = friend_recommendations(friendships, "Alice")
print(f"\n给 Alice 的好友推荐:")
for person, count in recommendations:
print(f" {person} (有 {count} 个共同好友)")
# 好友数排名
ranking = friend_count_ranking(friendships)
print(f"\n好友数排名:")
for person, count in ranking:
print(f" {person}: {count} 个好友")
# 分隔度数
degrees = degrees_of_separation(friendships, "Alice", "Frank")
print(f"\nAlice 和 Frank 之间的分隔度数: {degrees}")练习 9: 调查问卷逻辑一致性检查
题目描述: 检查问卷回答的逻辑一致性。例如:
- 如果回答"收入>10万",则不能回答"经济困难"
- 如果回答"无子女",则不能回答"子女教育支出"
要求:
- 定义逻辑规则
- 批量检查数据
- 生成详细的错误报告
参考答案
python
class ConsistencyChecker:
"""问卷逻辑一致性检查器"""
def __init__(self):
"""初始化规则"""
self.rules = []
self.error_messages = {}
def add_rule(self, rule_func, error_message):
"""
添加检查规则
Parameters:
rule_func (callable): 规则函数,返回 True 表示通过
error_message (str): 错误信息
"""
self.rules.append((rule_func, error_message))
def check_record(self, record):
"""
检查单条记录
Returns:
tuple: (是否通过, 错误列表)
"""
errors = []
for rule_func, error_message in self.rules:
if not rule_func(record):
errors.append(error_message)
return len(errors) == 0, errors
def check_batch(self, records):
"""
批量检查
Returns:
dict: 检查结果统计
"""
results = {
'total': len(records),
'passed': 0,
'failed': 0,
'error_details': []
}
for i, record in enumerate(records):
passed, errors = self.check_record(record)
if passed:
results['passed'] += 1
else:
results['failed'] += 1
results['error_details'].append({
'index': i,
'id': record.get('id', 'Unknown'),
'errors': errors
})
return results
# 定义具体的检查规则
def create_income_survey_checker():
"""创建收入调查的一致性检查器"""
checker = ConsistencyChecker()
# 规则 1: 高收入者不应该经济困难
checker.add_rule(
lambda r: not (r.get('income', 0) > 100000 and r.get('economic_difficulty', False)),
"高收入(>10万)但声称经济困难"
)
# 规则 2: 无子女不应有子女教育支出
checker.add_rule(
lambda r: not (r.get('num_children', 0) == 0 and r.get('education_expense', 0) > 0),
"无子女但有教育支出"
)
# 规则 3: 未婚不应有配偶收入
checker.add_rule(
lambda r: not (not r.get('married', False) and r.get('spouse_income') is not None),
"未婚但填写了配偶收入"
)
# 规则 4: 退休者不应有工作收入
checker.add_rule(
lambda r: not (r.get('retired', False) and r.get('work_income', 0) > 0),
"已退休但有工作收入"
)
# 规则 5: 年龄与教育年限不匹配
checker.add_rule(
lambda r: not (r.get('age', 0) < r.get('education_years', 0) + 6),
"年龄小于教育年限+6岁(不合理)"
)
return checker
# 测试
survey_data = [
{
"id": 1001,
"income": 120000,
"economic_difficulty": True, # 错误!
"num_children": 0,
"education_expense": 0,
"married": True,
"spouse_income": 50000
},
{
"id": 1002,
"income": 80000,
"economic_difficulty": False,
"num_children": 0,
"education_expense": 5000, # 错误!
"married": False,
"spouse_income": None
},
{
"id": 1003,
"income": 60000,
"economic_difficulty": False,
"num_children": 2,
"education_expense": 10000,
"married": True,
"spouse_income": 55000
},
{
"id": 1004,
"age": 25,
"education_years": 22, # 错误!
"income": 50000,
"married": False
}
]
# 创建检查器
checker = create_income_survey_checker()
# 批量检查
results = checker.check_batch(survey_data)
print(f"检查结果:")
print(f" 总记录数: {results['total']}")
print(f" 通过: {results['passed']}")
print(f" 失败: {results['failed']}")
if results['failed'] > 0:
print(f"\n错误详情:")
for error in results['error_details']:
print(f"\n 记录 ID {error['id']} (索引 {error['index']}):")
for err_msg in error['errors']:
print(f" - {err_msg}")练习 10: 面板数据处理 - 个体-时间索引
题目描述: 处理面板数据(Panel Data),创建双索引(个体 × 时间)的数据结构。
要求:
- 创建 (个体 ID, 年份) 的双索引
- 计算个体的时间序列统计(增长率、累计值)
- 检测面板平衡性(是否所有个体都有所有年份的数据)
输入数据:
python
panel_data = [
{"id": 1001, "year": 2020, "income": 50000},
{"id": 1001, "year": 2021, "income": 55000},
{"id": 1001, "year": 2022, "income": 60000},
{"id": 1002, "year": 2020, "income": 60000},
{"id": 1002, "year": 2021, "income": 65000},
# 1002 缺少 2022 年数据(不平衡面板)
]参考答案
python
class PanelData:
"""面板数据处理类"""
def __init__(self, data, id_var="id", time_var="year"):
"""
初始化面板数据
Parameters:
data (list): 原始数据
id_var (str): 个体 ID 字段
time_var (str): 时间字段
"""
self.id_var = id_var
self.time_var = time_var
self.data = data
# 创建双索引字典 {(id, year): record}
self.index = {}
for record in data:
key = (record[id_var], record[time_var])
self.index[key] = record
# 提取所有个体和时间点
self.ids = sorted(set(r[id_var] for r in data))
self.times = sorted(set(r[time_var] for r in data))
def get(self, id_value, time_value):
"""获取特定 (id, time) 的记录"""
return self.index.get((id_value, time_value))
def is_balanced(self):
"""
检查是否为平衡面板
Returns:
tuple: (是否平衡, 缺失记录列表)
"""
expected_count = len(self.ids) * len(self.times)
actual_count = len(self.index)
if expected_count == actual_count:
return True, []
# 找出缺失的记录
missing = []
for id_val in self.ids:
for time_val in self.times:
if (id_val, time_val) not in self.index:
missing.append((id_val, time_val))
return False, missing
def calculate_growth_rate(self, value_var):
"""
计算个体的增长率
Parameters:
value_var (str): 值变量(如 income)
Returns:
list: 包含增长率的新数据
"""
result = []
for id_val in self.ids:
# 获取该个体的所有时间点数据
id_data = [(t, self.get(id_val, t)) for t in self.times]
id_data = [(t, r) for t, r in id_data if r is not None]
id_data.sort(key=lambda x: x[0])
# 计算增长率
for i, (time_val, record) in enumerate(id_data):
new_record = record.copy()
if i > 0:
prev_time, prev_record = id_data[i - 1]
prev_value = prev_record[value_var]
curr_value = record[value_var]
if prev_value > 0:
growth_rate = (curr_value - prev_value) / prev_value
new_record[f'{value_var}_growth'] = growth_rate
else:
new_record[f'{value_var}_growth'] = None
else:
new_record[f'{value_var}_growth'] = None
result.append(new_record)
return result
def calculate_cumsum(self, value_var):
"""计算个体的累计值"""
result = []
for id_val in self.ids:
id_data = [(t, self.get(id_val, t)) for t in self.times]
id_data = [(t, r) for t, r in id_data if r is not None]
id_data.sort(key=lambda x: x[0])
cumsum = 0
for time_val, record in id_data:
new_record = record.copy()
cumsum += record[value_var]
new_record[f'{value_var}_cumsum'] = cumsum
result.append(new_record)
return result
def summary(self):
"""面板数据摘要"""
balanced, missing = self.is_balanced()
return {
'num_individuals': len(self.ids),
'num_time_periods': len(self.times),
'total_observations': len(self.data),
'expected_observations': len(self.ids) * len(self.times),
'is_balanced': balanced,
'missing_count': len(missing),
'time_range': (min(self.times), max(self.times))
}
# 测试
panel_data = [
{"id": 1001, "year": 2020, "income": 50000},
{"id": 1001, "year": 2021, "income": 55000},
{"id": 1001, "year": 2022, "income": 60000},
{"id": 1002, "year": 2020, "income": 60000},
{"id": 1002, "year": 2021, "income": 65000},
{"id": 1003, "year": 2020, "income": 70000},
{"id": 1003, "year": 2021, "income": 75000},
{"id": 1003, "year": 2022, "income": 80000},
]
# 创建面板数据对象
panel = PanelData(panel_data, id_var="id", time_var="year")
# 摘要信息
summary = panel.summary()
print("面板数据摘要:")
for key, value in summary.items():
print(f" {key}: {value}")
# 平衡性检查
balanced, missing = panel.is_balanced()
if not balanced:
print(f"\n不平衡面板,缺失 {len(missing)} 条记录:")
for id_val, year in missing[:5]: # 只显示前 5 个
print(f" ID {id_val}, 年份 {year}")
# 计算增长率
data_with_growth = panel.calculate_growth_rate("income")
print("\n带增长率的数据:")
for record in data_with_growth[:6]: # 只显示前 6 个
print(f" {record}")
# 计算累计值
data_with_cumsum = panel.calculate_cumsum("income")
print("\n带累计值的数据:")
for record in data_with_cumsum[:6]:
print(f" {record}")延伸阅读
官方文档
推荐资源
性能优化
下一步
恭喜完成 Module 4 的学习!你已经掌握了:
- Python 的四种核心数据结构(列表、元组、字典、集合)
- 如何选择合适的数据结构
- 10 个综合练习题,涵盖数据处理的各种场景
建议:
- 重点掌握列表和字典:这是最常用的两种结构
- 理解集合的优势:去重和集合运算非常高效
- 实践嵌套结构:真实数据往往是嵌套的(列表套字典)
在 Module 5 中,我们将学习函数和模块,让代码更加模块化和可复用。
在 Module 9 中,我们将深入学习 Pandas,它整合了所有数据结构的优点!
继续加油!数据结构是数据处理的基石!