Module 7 Summary and Review
File Operations — Reading and Storing Data
Knowledge Summary
1. Text File Operations
Basic modes:
'r': Read-only (default)'w': Write (overwrite)'a': Append'r+': Read and write
Reading methods:
# Method 1: Read entire file
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
# Method 2: Read line by line
with open('file.txt', 'r', encoding='utf-8') as f:
for line in f:
print(line.strip())
# Method 3: Read all lines to list
with open('file.txt', 'r', encoding='utf-8') as f:
lines = f.readlines()Writing methods:
# Overwrite
with open('output.txt', 'w', encoding='utf-8') as f:
f.write("Content\n")
# Append
with open('output.txt', 'a', encoding='utf-8') as f:
f.write("Appended content\n")2. CSV File Processing
Using csv module:
import csv
# Write
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Name', 'Age'])
writer.writerow(['Alice', 25])
# Read
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
print(row)Using Pandas (recommended):
import pandas as pd
# Read
df = pd.read_csv('data.csv', encoding='utf-8')
# Write
df.to_csv('output.csv', index=False, encoding='utf-8-sig') # utf-8-sig prevents Excel garbling3. Excel File Processing
Install dependencies:
pip install openpyxl # For .xlsxRead Excel:
import pandas as pd
# Read single worksheet
df = pd.read_excel('data.xlsx', sheet_name='Sheet1')
# Read multiple worksheets
excel_file = pd.ExcelFile('data.xlsx')
df1 = pd.read_excel(excel_file, sheet_name='2023')
df2 = pd.read_excel(excel_file, sheet_name='2024')Write Excel:
# Single worksheet
df.to_excel('output.xlsx', sheet_name='Results', index=False)
# Multiple worksheets
with pd.ExcelWriter('report.xlsx') as writer:
df1.to_excel(writer, sheet_name='2023', index=False)
df2.to_excel(writer, sheet_name='2024', index=False)4. Stata File Reading and Writing
Read .dta files:
import pandas as pd
# Basic reading
df = pd.read_stata('survey.dta')
# Preserve categorical variables and labels
df = pd.read_stata('survey.dta', convert_categoricals=True)Write .dta files:
# Save as Stata 13/14 format
df.to_stata('output.dta', write_index=False, version=117)
# Add variable labels
variable_labels = {
'age': 'Respondent Age',
'income': 'Annual Income'
}
df.to_stata('output.dta', write_index=False, variable_labels=variable_labels)Version mapping:
- 117 = Stata 13/14
- 118 = Stata 15/16
- 119 = Stata 17
5. JSON Data Processing
Basic operations:
import json
# Python → JSON string
data = {'name': 'Alice', 'age': 25}
json_str = json.dumps(data, indent=2, ensure_ascii=False)
# JSON string → Python
data = json.loads(json_str)
# Write to file
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Read from file
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)Pandas and JSON interconversion:
# DataFrame → JSON
df.to_json('data.json', orient='records', indent=2, force_ascii=False)
# JSON → DataFrame
df = pd.read_json('data.json')6. Path Operations
Using pathlib (recommended):
from pathlib import Path
# Create path
data_dir = Path('data')
file_path = data_dir / 'survey.csv'
# Check existence
if file_path.exists():
print("File exists")
# Create directory
data_dir.mkdir(parents=True, exist_ok=True)
# List files
for file in data_dir.glob('*.csv'):
print(file.name)File Format Comparison
| Format | Advantages | Disadvantages | Use Cases |
|---|---|---|---|
| CSV | Small, fast, universal | No formatting, single table | Big data, pure data |
| Excel | Multiple tables, formatting | Slow, large | Reports, presentation |
| Stata (.dta) | Preserves labels | Social science only | Stata data exchange |
| JSON | Nested structure | Larger | API, configuration |
Common Errors
1. Forgetting to Specify Encoding
# Wrong: May cause garbled text
with open('file.txt', 'r') as f:
content = f.read()
# Correct: Explicitly specify UTF-8
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()2. Forgetting to Close File
# Wrong: Requires manual closing
f = open('file.txt', 'r')
content = f.read()
f.close()
# Correct: Use with statement for automatic closing
with open('file.txt', 'r') as f:
content = f.read()3. CSV Write Forgetting newline=''
# Wrong: Blank lines on Windows
with open('data.csv', 'w') as f:
writer = csv.writer(f)
# Correct
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)4. Excel Chinese Text Garbling
# Wrong: Chinese garbled in Excel
df.to_csv('output.csv', index=False, encoding='utf-8')
# Correct: Use utf-8-sig
df.to_csv('output.csv', index=False, encoding='utf-8-sig')Best Practices
1. Always Use with Statement
# Good habit
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()2. Check File Existence
from pathlib import Path
file_path = Path('data.csv')
if file_path.exists():
df = pd.read_csv(file_path)
else:
print("File does not exist")3. Process Large Files in Chunks
# Large CSV file
for chunk in pd.read_csv('large.csv', chunksize=10000):
process(chunk)
# Large Stata file
for chunk in pd.read_stata('large.dta', chunksize=10000):
process(chunk)4. Don't Include Index When Saving
# Usually don't need to save index
df.to_csv('output.csv', index=False)
df.to_excel('output.xlsx', index=False)
df.to_stata('output.dta', write_index=False)Programming Exercises
Exercise 1: Text File Log Analysis (Basic)
Difficulty: ⭐⭐ Time: 15 minutes
Analyze server log file.
"""
Tasks:
1. Read server.log file
2. Count lines containing "ERROR"
3. Extract all error messages
4. Save results to error_report.txt
"""
# Sample log content
# 2024-01-15 10:30:00 INFO Server started
# 2024-01-15 10:30:15 ERROR Connection failed
# 2024-01-15 10:30:20 INFO Request received
# 2024-01-15 10:30:25 ERROR Database timeoutReference Solution
from datetime import datetime
def analyze_log(log_file):
"""Analyze log file and extract error messages"""
error_count = 0
errors = []
# Read log
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
if 'ERROR' in line:
error_count += 1
errors.append(line.strip())
# Generate report
report = []
report.append("=" * 50)
report.append("Error Log Analysis Report")
report.append("=" * 50)
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Total errors: {error_count}\n")
report.append("Error details:")
report.append("-" * 50)
for i, error in enumerate(errors, 1):
report.append(f"{i}. {error}")
report.append("=" * 50)
# Save report
with open('error_report.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(report))
print(f"Analysis complete: Found {error_count} errors")
print(f"Report saved to error_report.txt")
return error_count, errors
# Test: Create sample log
sample_log = """2024-01-15 10:30:00 INFO Server started
2024-01-15 10:30:15 ERROR Connection failed to database
2024-01-15 10:30:20 INFO Request received from client
2024-01-15 10:30:25 ERROR Database timeout after 30s
2024-01-15 10:30:30 INFO Processing data
2024-01-15 10:30:35 ERROR Failed to write to disk
2024-01-15 10:30:40 INFO Task completed"""
with open('server.log', 'w', encoding='utf-8') as f:
f.write(sample_log)
# Analyze
error_count, errors = analyze_log('server.log')Exercise 2: CSV Data Cleaning and Transformation (Basic)
Difficulty: ⭐⭐ Time: 20 minutes
"""
Tasks:
Given students.csv file (contains: name, age, major, gpa)
1. Read CSV file
2. Filter students with GPA >= 3.5
3. Group by major and calculate average GPA
4. Save results to high_performers.csv
"""Reference Solution
import pandas as pd
def clean_student_data(input_file, output_file, min_gpa=3.5):
"""Clean student data and generate report"""
# Read data
df = pd.read_csv(input_file)
print(f"Original data: {len(df)} records")
# Filter high GPA students
high_performers = df[df['gpa'] >= min_gpa].copy()
print(f"GPA >= {min_gpa}: {len(high_performers)} students")
# Group by major and calculate statistics
major_stats = high_performers.groupby('major').agg({
'gpa': ['count', 'mean', 'min', 'max']
}).round(2)
print(f"\nStatistics by major:")
print(major_stats)
# Save results
high_performers.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"\nResults saved to {output_file}")
# Save statistics
major_stats.to_csv('major_statistics.csv', encoding='utf-8-sig')
print(f"Statistics saved to major_statistics.csv")
return high_performers, major_stats
# Test: Create sample data
sample_data = pd.DataFrame({
'name': ['Alice', 'Bob', 'Carol', 'David', 'Eve', 'Frank'],
'age': [20, 21, 22, 20, 21, 22],
'major': ['Economics', 'Sociology', 'Economics', 'Political Science', 'Sociology', 'Economics'],
'gpa': [3.8, 3.4, 3.9, 3.2, 3.7, 3.6]
})
sample_data.to_csv('students.csv', index=False)
# Process
high_performers, stats = clean_student_data('students.csv', 'high_performers.csv', min_gpa=3.5)Exercise 3: Multi-format Data Conversion (Intermediate)
Difficulty: ⭐⭐⭐ Time: 30 minutes
"""
Task: Create a multi-format data conversion tool
Features:
1. Read any format (CSV, Excel, Stata, JSON)
2. Convert to other formats
3. Support data cleaning (remove missing values, outliers)
"""Reference Solution
import pandas as pd
from pathlib import Path
import json
class DataConverter:
"""Multi-format data conversion tool"""
def __init__(self):
self.df = None
self.original_shape = None
def read_file(self, file_path):
"""Auto-detect and read file"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Select reading method based on extension
suffix = file_path.suffix.lower()
if suffix == '.csv':
self.df = pd.read_csv(file_path)
elif suffix in ['.xlsx', '.xls']:
self.df = pd.read_excel(file_path)
elif suffix == '.dta':
self.df = pd.read_stata(file_path)
elif suffix == '.json':
self.df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported file format: {suffix}")
self.original_shape = self.df.shape
print(f"Read successful: {file_path.name}")
print(f" Dimensions: {self.df.shape[0]} rows × {self.df.shape[1]} columns")
return self
def clean_data(self, drop_na=True, remove_outliers=False, outlier_columns=None):
"""Data cleaning"""
if self.df is None:
raise ValueError("Please read data first")
# Remove missing values
if drop_na:
before = len(self.df)
self.df = self.df.dropna()
removed = before - len(self.df)
if removed > 0:
print(f"Removed {removed} rows with missing data")
# Remove outliers (using IQR method)
if remove_outliers and outlier_columns:
for col in outlier_columns:
if col in self.df.columns and pd.api.types.is_numeric_dtype(self.df[col]):
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
before = len(self.df)
self.df = self.df[
(self.df[col] >= Q1 - 1.5 * IQR) &
(self.df[col] <= Q3 + 1.5 * IQR)
]
removed = before - len(self.df)
if removed > 0:
print(f"Column '{col}': Removed {removed} outliers")
return self
def save_as(self, output_path, **kwargs):
"""Save as specified format"""
if self.df is None:
raise ValueError("No data to save")
output_path = Path(output_path)
suffix = output_path.suffix.lower()
try:
if suffix == '.csv':
self.df.to_csv(output_path, index=False, encoding='utf-8-sig', **kwargs)
elif suffix in ['.xlsx', '.xls']:
self.df.to_excel(output_path, index=False, **kwargs)
elif suffix == '.dta':
self.df.to_stata(output_path, write_index=False, version=117, **kwargs)
elif suffix == '.json':
self.df.to_json(output_path, orient='records', indent=2, force_ascii=False, **kwargs)
else:
raise ValueError(f"Unsupported output format: {suffix}")
print(f"Save successful: {output_path.name}")
print(f" Dimensions: {self.df.shape[0]} rows × {self.df.shape[1]} columns")
except Exception as e:
print(f"Save failed: {e}")
def get_summary(self):
"""Display data summary"""
if self.df is None:
print("No data")
return
print("\n" + "=" * 60)
print("Data Summary")
print("=" * 60)
print(f"Original dimensions: {self.original_shape[0]} rows × {self.original_shape[1]} columns")
print(f"Current dimensions: {self.df.shape[0]} rows × {self.df.shape[1]} columns")
print(f"\nColumn information:")
print(self.df.dtypes)
print(f"\nFirst 5 rows:")
print(self.df.head())
print("=" * 60)
# Test
if __name__ == "__main__":
# Create test data
test_data = pd.DataFrame({
'id': range(1, 11),
'age': [25, 30, None, 28, 32, 200, 27, 33, 38, 29], # Contains missing and outliers
'income': [50000, 75000, 85000, None, 70000, 90000, 55000, 80000, 95000, 65000]
})
test_data.to_csv('test_input.csv', index=False)
# Conversion workflow
print("Example 1: CSV → Excel (with cleaning)")
converter = DataConverter()
(converter
.read_file('test_input.csv')
.clean_data(drop_na=True, remove_outliers=True, outlier_columns=['age'])
.save_as('test_output.xlsx'))
print("\nExample 2: CSV → Stata")
converter2 = DataConverter()
(converter2
.read_file('test_input.csv')
.clean_data(drop_na=True)
.save_as('test_output.dta'))
print("\nExample 3: CSV → JSON")
converter3 = DataConverter()
(converter3
.read_file('test_input.csv')
.clean_data(drop_na=True)
.save_as('test_output.json'))
# Display summary
converter.get_summary()Exercise 4: Survey Data Management System (Intermediate)
Difficulty: ⭐⭐⭐ Time: 35 minutes
Create a complete survey data management system.
Reference Solution
import pandas as pd
import json
from pathlib import Path
from datetime import datetime
class SurveyManager:
"""Survey data management system"""
def __init__(self, survey_name, base_dir='surveys'):
self.survey_name = survey_name
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True)
# Create subdirectories
self.raw_dir = self.base_dir / 'raw'
self.clean_dir = self.base_dir / 'clean'
self.reports_dir = self.base_dir / 'reports'
for dir in [self.raw_dir, self.clean_dir, self.reports_dir]:
dir.mkdir(exist_ok=True)
self.df = None
self.metadata = {
'survey_name': survey_name,
'created_at': datetime.now().isoformat(),
'total_responses': 0,
'valid_responses': 0
}
def import_data(self, file_path, file_format='csv'):
"""Import data"""
file_path = Path(file_path)
if file_format == 'csv':
self.df = pd.read_csv(file_path)
elif file_format == 'excel':
self.df = pd.read_excel(file_path)
elif file_format == 'stata':
self.df = pd.read_stata(file_path)
else:
raise ValueError(f"Unsupported format: {file_format}")
self.metadata['total_responses'] = len(self.df)
print(f"Imported {len(self.df)} responses")
# Save raw data
self._save_raw()
return self
def validate_and_clean(self, age_range=(18, 100), income_min=0):
"""Validate and clean data"""
if self.df is None:
raise ValueError("Please import data first")
before = len(self.df)
# Remove missing values
self.df = self.df.dropna(subset=['age', 'income'])
# Filter age
self.df = self.df[
(self.df['age'] >= age_range[0]) &
(self.df['age'] <= age_range[1])
]
# Filter income
self.df = self.df[self.df['income'] >= income_min]
after = len(self.df)
removed = before - after
self.metadata['valid_responses'] = after
self.metadata['removed_responses'] = removed
print(f"Cleaning complete: Removed {removed} invalid records, kept {after} records")
# Save cleaned data
self._save_clean()
return self
def generate_report(self):
"""Generate analysis report"""
if self.df is None or len(self.df) == 0:
print("No data to analyze")
return
report = {
'metadata': self.metadata,
'summary': {
'age': {
'mean': float(self.df['age'].mean()),
'median': float(self.df['age'].median()),
'min': int(self.df['age'].min()),
'max': int(self.df['age'].max())
},
'income': {
'mean': float(self.df['income'].mean()),
'median': float(self.df['income'].median()),
'min': float(self.df['income'].min()),
'max': float(self.df['income'].max())
}
}
}
# Save JSON report
report_file = self.reports_dir / f'{self.survey_name}_report.json'
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# Generate text report
self._generate_text_report(report)
print(f"Report generated")
return report
def export(self, format='csv'):
"""Export data"""
if self.df is None:
raise ValueError("No data to export")
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{self.survey_name}_{timestamp}"
if format == 'csv':
output_path = self.clean_dir / f"{filename}.csv"
self.df.to_csv(output_path, index=False, encoding='utf-8-sig')
elif format == 'excel':
output_path = self.clean_dir / f"{filename}.xlsx"
self.df.to_excel(output_path, index=False)
elif format == 'stata':
output_path = self.clean_dir / f"{filename}.dta"
self.df.to_stata(output_path, write_index=False, version=117)
else:
raise ValueError(f"Unsupported format: {format}")
print(f"Export successful: {output_path}")
def _save_raw(self):
"""Save raw data"""
raw_file = self.raw_dir / f'{self.survey_name}_raw.csv'
self.df.to_csv(raw_file, index=False, encoding='utf-8-sig')
def _save_clean(self):
"""Save cleaned data"""
clean_file = self.clean_dir / f'{self.survey_name}_clean.csv'
self.df.to_csv(clean_file, index=False, encoding='utf-8-sig')
def _generate_text_report(self, report):
"""Generate text report"""
lines = []
lines.append("=" * 60)
lines.append(f"Survey Analysis Report: {self.survey_name}")
lines.append("=" * 60)
lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"\nData Overview:")
lines.append(f" Total responses: {self.metadata['total_responses']}")
lines.append(f" Valid responses: {self.metadata['valid_responses']}")
lines.append(f" Removed: {self.metadata.get('removed_responses', 0)}")
lines.append(f"\nAge Statistics:")
age_stats = report['summary']['age']
lines.append(f" Mean: {age_stats['mean']:.1f}")
lines.append(f" Median: {age_stats['median']:.1f}")
lines.append(f" Range: {age_stats['min']} - {age_stats['max']}")
lines.append(f"\nIncome Statistics:")
income_stats = report['summary']['income']
lines.append(f" Mean: ${income_stats['mean']:,.0f}")
lines.append(f" Median: ${income_stats['median']:,.0f}")
lines.append(f" Range: ${income_stats['min']:,.0f} - ${income_stats['max']:,.0f}")
lines.append("=" * 60)
report_file = self.reports_dir / f'{self.survey_name}_report.txt'
with open(report_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
# Test
if __name__ == "__main__":
# Create test data
test_data = pd.DataFrame({
'id': range(1, 21),
'age': [25, 30, 22, 28, 32, 15, 27, 33, 38, 29,
35, 40, 26, 31, 24, None, 36, 29, 34, 150],
'income': [50000, 75000, 45000, 60000, 70000, 30000, 55000, 80000, 95000, 65000,
85000, 90000, 52000, 72000, 48000, None, 88000, 67000, 78000, -1000]
})
test_data.to_csv('survey_raw.csv', index=False)
# Use management system
manager = SurveyManager('income_survey_2024')
(manager
.import_data('survey_raw.csv', file_format='csv')
.validate_and_clean(age_range=(18, 65), income_min=0)
.generate_report()
.export(format='excel'))
print("\nProcessing complete! Check surveys/ directory for results")Exercise 5: Batch File Processing (Advanced)
Difficulty: ⭐⭐⭐⭐ Time: 40 minutes
Batch process multiple data files and merge.
"""
Tasks:
1. Read all CSV files in data/ folder
2. Each file represents one year of survey data
3. Merge all data, add year column
4. Clean outliers
5. Generate annual comparison report
6. Save in multiple formats (CSV, Excel, Stata)
"""Reference Solution (Simplified)
import pandas as pd
from pathlib import Path
import re
class BatchProcessor:
"""Batch file processor"""
def __init__(self, input_dir, output_dir='output'):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.all_data = []
def process_all_files(self, pattern='*.csv'):
"""Process all files"""
files = list(self.input_dir.glob(pattern))
print(f"Found {len(files)} files")
for file in files:
# Extract year from filename
year_match = re.search(r'(\d{4})', file.stem)
year = int(year_match.group(1)) if year_match else None
df = pd.read_csv(file)
df['year'] = year
df['source_file'] = file.name
self.all_data.append(df)
print(f"{file.name}: {len(df)} rows")
# Merge
self.combined_df = pd.concat(self.all_data, ignore_index=True)
print(f"\nMerge complete: {len(self.combined_df)} rows")
return self
def clean_data(self):
"""Clean data"""
before = len(self.combined_df)
self.combined_df = self.combined_df.dropna()
self.combined_df = self.combined_df[
(self.combined_df['age'] >= 18) &
(self.combined_df['age'] <= 100) &
(self.combined_df['income'] > 0)
]
after = len(self.combined_df)
print(f"Cleaning: Removed {before - after} rows, kept {after} rows")
return self
def generate_yearly_report(self):
"""Generate annual report"""
yearly_stats = self.combined_df.groupby('year').agg({
'age': ['count', 'mean'],
'income': ['mean', 'median']
}).round(2)
print("\nAnnual statistics:")
print(yearly_stats)
# Save report
yearly_stats.to_csv(self.output_dir / 'yearly_report.csv')
return self
def export_all_formats(self, base_name='combined'):
"""Export all formats"""
self.combined_df.to_csv(
self.output_dir / f'{base_name}.csv',
index=False, encoding='utf-8-sig'
)
self.combined_df.to_excel(
self.output_dir / f'{base_name}.xlsx',
index=False
)
self.combined_df.to_stata(
self.output_dir / f'{base_name}.dta',
write_index=False, version=117
)
print(f"Export complete: {base_name}.{{csv,xlsx,dta}}")
# Usage example
# processor = BatchProcessor('data/')
# (processor
# .process_all_files(pattern='survey_*.csv')
# .clean_data()
# .generate_yearly_report()
# .export_all_formats('panel_data'))Next Steps
After completing this module, you have mastered:
- Text file reading and writing
- CSV/Excel processing
- Stata file reading and writing
- JSON data processing
- Path operations and file management
Congratulations on completing Module 7!
In Module 8, we will learn exception handling and debugging.
Further Reading
Ready to learn error handling?