Lưu trữ dữ liệu tối ưu với Parquet
Cập nhật lần cuối:
Thảo luậnMục lục
Giới thiệu
Tính năng mới này cung cấp khả năng lưu trữ dữ liệu chứng khoán hiệu quả hơn bằng định dạng Parquet, kèm theo cấu trúc thư mục rõ ràng để dễ quản lý.
Cấu trúc thư mục
Dữ liệu được tổ chức theo cấu trúc sau:
Shell
data/
├── intraday/ # Loại dữ liệu (vd: intraday, daily, financials)
│ ├── YYYY-MM-DD/ # Ngày dữ liệu
│ │ ├── VNM.parquet
│ │ └── FPT.parquet
│ └── YYYY-MM-DD/
│ └── VNM.parquet
└── daily/ # Một loại dữ liệu khác
└── YYYY-MM-DD/
└── VNM.parquetCách sử dụng
1. Sử dụng ParquetExport đơn giản
Python
"""
Ví dụ đơn giản sử dụng DataManager để lưu trữ dữ liệu intraday.
"""
from vnstock_pipeline.core.data_manager import DataManager
from vnstock_pipeline.tasks.intraday import IntradayFetcher, IntradayTransformer, IntradayValidator
from pathlib import Path
# Khởi tạo data manager
data_manager = DataManager(Path("./data"))
# Khởi tạo các thành phần pipeline
fetcher = IntradayFetcher()
validator = IntradayValidator()
transformer = IntradayTransformer()
# Lấy và xử lý dữ liệu
ticker = "VNM"
print(f"Đang lấy dữ liệu intraday cho {ticker}...")
try:
# 1. Lấy dữ liệu từ API
raw_data = fetcher.fetch(ticker)
# 2. Kiểm tra dữ liệu
if validator.validate(raw_data):
# 3. Chuyển đổi dữ liệu
processed_data = transformer.transform(raw_data)
if len(processed_data) > 0:
# 4. Lưu dữ liệu
file_path = data_manager.save_data(
data=processed_data,
ticker=ticker,
data_type="intraday",
date="2025-08-30"
)
print(f"✅ Đã lưu {len(processed_data)} bản ghi vào {file_path}")
# 5. Xem trước dữ liệu
preview = data_manager.load_data(ticker, "intraday", "2025-08-30")
print(f"\nDữ liệu mới nhất:")
print(preview.tail(3))
else:
print("Không có dữ liệu để lưu")
else:
print("❌ Dữ liệu không hợp lệ")
except Exception as e:
print(f"❌ Lỗi: {str(e)}")2. Sử dụng DataManager để quản lý dữ liệu
Python
from vnstock_pipeline.core.data_manager import DataManager
import pandas as pd
# Khởi tạo DataManager
dm = DataManager("path/to/data")
# Lưu dữ liệu
# Dữ liệu sẽ được lưu vào: path/to/data/intraday/2025-08-30/VNM.parquet
dm.save_data(
data=data_frame, # DataFrame chứa dữ liệu
ticker="VNM",
data_type="intraday",
date="2025-08-30" # Nếu bỏ qua, mặc định là ngày hiện tại
)
# Đọc dữ liệu
# Đọc dữ liệu 2 ngày gần nhất
data = dm.load_data(
ticker="VNM",
data_type="intraday",
start_date="2025-08-29",
end_date="2025-08-30"
)
# Liệt kê dữ liệu có sẵn
available = dm.list_available_data("intraday")
print(available)
# Xóa dữ liệu
# Xóa dữ liệu của một mã cổ phiếu trong một ngày
dm.delete_data("intraday", ticker="VNM", date="2025-08-30")
# Xóa toàn bộ dữ liệu của một loại
dm.delete_data("intraday")Lợi ích của việc sử dụng Parquet
- Tiết kiệm dung lượng: Dữ liệu được nén hiệu quả, tiết kiệm tới 75% dung lượng so với CSV
- Tốc độ đọc/ghi nhanh: Đọc ghi nhanh hơn nhiều so với CSV, đặc biệt với khối lượng dữ liệu lớn
- Hỗ trợ schema: Tự động bảo toàn kiểu dữ liệu
- Hỗ trợ cắt cột: Chỉ đọc những cột cần thiết
- Tương thích: Làm việc tốt với các công cụ phân tích dữ liệu hiện đại
Chương trình mẫu
Lưu đoạn code sau thành file orderbook_parquet_daily.py sau đó chạy script với Python.
Python
"""
Pipeline tải dữ liệu intraday cuối ngày và lưu trữ bằng định dạng Parquet.
Chạy 1 lần vào cuối phiên giao dịch để tạo cơ sở dữ liệu.
"""
import time
from datetime import datetime, time as dtime, timedelta
from pathlib import Path
from vnstock.core.utils.market import trading_hours
from vnstock_pipeline.core.data_manager import DataManager
from vnstock_pipeline.tasks.intraday import (
IntradayFetcher,
IntradayTransformer,
IntradayValidator,
)
# Cấu hình
BASE_DATA_DIR = Path("./data")
TICKERS = [
'ACB', 'BCM', 'BID', 'BVH', 'CTG', 'FPT', 'GAS', 'GVR', 'HDB', 'HPG',
'LPB', 'MBB', 'MSN', 'MWG', 'PLX', 'SAB', 'SHB', 'SSB', 'SSI', 'STB',
'TCB', 'TPB', 'VCB', 'VHM', 'VIB', 'VIC', 'VJC', 'VNM', 'VPB', 'VRE'
]
# Thời gian chờ giữa các lần gọi API (giây)
API_DELAY = 1
def get_current_session_date():
"""Lấy ngày hiện tại dưới dạng YYYY-MM-DD."""
return datetime.now().strftime("%Y-%m-%d")
def process_intraday_data():
"""Xử lý và lưu trữ dữ liệu intraday."""
# Khởi tạo các thành phần pipeline
fetcher = IntradayFetcher()
validator = IntradayValidator()
transformer = IntradayTransformer()
data_manager = DataManager(BASE_DATA_DIR)
# Lấy ngày hiện tại
current_date = get_current_session_date()
print(f"\n=== Bắt đầu cập nhật dữ liệu ngày {current_date} ===")
success_count = 0
error_count = 0
for i, ticker in enumerate(TICKERS, 1):
try:
print(f"\n[{i}/{len(TICKERS)}] Đang xử lý {ticker}...")
# 1. Lấy dữ liệu
df = fetcher.fetch(ticker)
# 2. Kiểm tra dữ liệu
if not validator.validate(df):
print(" ⚠️ Lỗi kiểm tra dữ liệu, bỏ qua...")
error_count += 1
continue
# 3. Chuyển đổi dữ liệu
df = transformer.transform(df)
if len(df) == 0:
print(" ℹ️ Không có dữ liệu mới")
continue
# 4. Lưu dữ liệu dưới dạng Parquet
file_path = data_manager.save_data(
data=df,
ticker=ticker,
data_type="intraday",
date=current_date
)
# 5. In thông báo
last_row = df.iloc[-1]
try:
close_price = float(last_row.get('close', 0))
close_str = f"{close_price:,.2f}"
except (ValueError, TypeError):
close_str = str(last_row.get('close', 'N/A'))
msg = (
f" ✅ Đã lưu {len(df)} bản ghi vào {file_path}"
f"\n ⏰ Thời gian cuối: {last_row.get('time', 'N/A')}"
f"\n 💰 Giá đóng cửa: {close_str}"
)
print(msg)
success_count += 1
# Nghỉ giữa các lần gọi API
if i < len(TICKERS):
time.sleep(API_DELAY)
except Exception as e:
error_count += 1
print(f" ❌ Lỗi: {str(e)}")
# Tổng kết
print("\n=== TỔNG KẾT ===")
print(f"Thành công: {success_count}/{len(TICKERS)}")
print(f"Lỗi: {error_count}/{len(TICKERS)}")
print(f"Hoàn thành lúc: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
def wait_until_market_close():
"""
Kiểm tra thời gian giao dịch và quyết định có cần chờ không.
Sử dụng hàm trading_hours từ thư viện vnstock để xác định trạng thái thị trường.
Trả về True nếu cần chạy cập nhật dữ liệu, False nếu không cần.
"""
# Lấy thông tin giờ giao dịch từ thư viện vnstock
market_status = trading_hours(market="HOSE", enable_log=False, language="vi")
# Nếu là cuối tuần
if market_status["trading_session"] == "weekend":
print("Hôm nay là cuối tuần, không cần cập nhật dữ liệu.")
return False
# Nếu đang trong giờ giao dịch, chờ đến khi kết thúc
if market_status["is_trading_hour"]:
now = datetime.now()
close_time = now.replace(hour=15, minute=0, second=0, microsecond=0)
wait_seconds = (close_time - now).total_seconds()
if wait_seconds > 0:
wait_minutes = int(wait_seconds / 60)
print(f"Đang trong giờ giao dịch, chờ đến 15:00 (còn {wait_minutes} phút)...")
time.sleep(wait_seconds)
# Ngoài giờ giao dịch, chạy ngay
print("Ngoài giờ giao dịch, tiến hành cập nhật dữ liệu ngay...")
return True
if __name__ == "__main__":
print("=== CHƯƠNG TRÌNH CẬP NHẬT DỮ LIỆU INTRADAY ===")
print(f"Theo dõi {len(TICKERS)} mã cổ phiếu")
print("Chương trình sẽ chạy vào cuối phiên giao dịch (15:00)")
print("Nhấn Ctrl+C để dừng chương trình\n")
try:
# Kiểm tra xem có cần chạy cập nhật không
should_run = wait_until_market_close()
if should_run:
# Chạy cập nhật dữ liệu
process_intraday_data()
else:
print("Không cần cập nhật dữ liệu vào thời điểm này.")
except KeyboardInterrupt:
print("\nĐã dừng chương trình.")
except Exception as e:
print(f"\nLỗi không mong muốn: {str(e)}")Mẹo sử dụng
- Chọn cột khi đọc: Luôn chỉ định các cột cần thiết để tăng tốc độ đọc
Python
# Chỉ đọc các cột được chọn
dm.load_data("VNM", "intraday", columns=["time", "price", "volume"])- Lọc dữ liệu khi đọc: Sử dụng bộ lọc để giảm lượng dữ liệu cần đọc
Python
# Chỉ đọc dữ liệu có giá đóng cửa > 100
filters = [("price", "<=", 60.3), ("volume", ">", 10000)]
data = dm.load_data("VNM", "intraday", filters=filters)- Xử lý dữ liệu lớn: Với dữ liệu lớn, sử dụng iterator để xử lý từng phần
Python
dataset = ds.dataset("path/to/data/intraday")
for batch in dataset.to_batches():
df = batch.to_pandas()
# Xử lý từng batchXử lý lỗi thường gặp
- Lỗi thiếu thư viện: Đảm bảo đã cài đặt đầy đủ các thư viện phụ thuộc là
pyarrow - Lỗi kiểu dữ liệu: Kiểm tra kiểu dữ liệu trước khi lưu
- Quyền truy cập: Đảm bảo có quyền ghi vào thư mục đích
Lời kết
Tính năng mới này cung cấp một giải pháp lưu trữ dữ liệu hiệu quả và dễ sử dụng, giúp tối ưu hiệu năng và dễ dàng mở rộng cho các ứng dụng phân tích dữ liệu chứng khoán.
Thảo luận