Vnstock Logo

Tùy chỉnh Pipeline của bạn

Cập nhật lần cuối:

Thảo luận

Tùy chỉnh pipeline

vnstock_pipeline cung cấp ba phương pháp chính để tùy chỉnh pipeline:

  1. Sử dụng template có sẵn với tham số tùy chỉnh
  2. Mở rộng các lớp template
  3. Tạo pipeline hoàn toàn mới

Sử dụng template có sẵn với tham số tùy chỉnh

Đây là cách đơn giản nhất để tùy chỉnh pipeline. Bạn chỉ cần gọi các hàm task có sẵn với các tham số phù hợp với nhu cầu của mình.

from vnstock_pipeline.tasks.financial import run_financial_task

# Tùy chỉnh các tham số
run_financial_task(
    tickers=["ACB", "VCB", "HPG"],
    balance_sheet_period="quarter",  # Thay đổi từ "year" sang "quarter"
    lang="en",                       # Thay đổi ngôn ngữ sang tiếng Anh
    dropna=False                     # Giữ lại các giá trị NaN
)

Mở rộng các lớp template

Khi bạn cần tùy chỉnh logic xử lý dữ liệu, bạn có thể mở rộng các lớp template có sẵn.

from vnstock_pipeline.template.vnstock import VNFetcher, VNValidator, VNTransformer
from vnstock_pipeline.core.scheduler import Scheduler
from vnstock_pipeline.core.exporter import CSVExport
import pandas as pd

# Tùy chỉnh Fetcher
class CustomIntradayFetcher(VNFetcher):
    def _vn_call(self, ticker: str, **kwargs) -> pd.DataFrame:
        # Tùy chỉnh cách lấy dữ liệu
        # Ví dụ: thêm caching, thay đổi tham số, v.v.
        page_size = kwargs.get("page_size", 100000)  # Tăng page_size
        stock = Vnstock().stock(symbol=ticker, source="VCI")
        return stock.quote.intraday(page_size=page_size)

# Tùy chỉnh Transformer
class CustomIntradayTransformer(VNTransformer):
    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        # Gọi phương thức transform của lớp cha
        df = super().transform(data)
        
        # Thêm xử lý tùy chỉnh
        if "price" in df.columns:
            df["price_change"] = df["price"].diff()
        
        return df

# Sử dụng các lớp tùy chỉnh
fetcher = CustomIntradayFetcher()
validator = VNValidator()  # Sử dụng validator mặc định
transformer = CustomIntradayTransformer()
exporter = CSVExport(base_path="./data/custom_intraday")

# Tạo scheduler và chạy
scheduler = Scheduler(fetcher, validator, transformer, exporter)
scheduler.run(["ACB", "VCB", "HPG"])

Tạo pipeline hoàn toàn mới

Khi bạn cần một pipeline hoàn toàn tùy chỉnh, bạn có thể triển khai trực tiếp các lớp trừu tượng trong module core.

import pandas as pd
import time
from pathlib import Path

# Tạo thư mục lưu trữ dữ liệu
DATA_DIR = Path("./data/custom_intraday")
DATA_DIR.mkdir(parents=True, exist_ok=True)

# Danh sách mã chứng khoán
TICKERS = ['ACB', 'VCB', 'HPG', 'VNM', 'VIC']

# Hàm tùy chỉnh để lấy dữ liệu
def fetch_data(ticker):
    # Triển khai logic lấy dữ liệu của bạn ở đây
    from vnstock import Vnstock
    stock = Vnstock().stock(symbol=ticker, source="VCI")
    return stock.quote.intraday()

# Hàm tùy chỉnh để xác thực dữ liệu
def validate_data(data):
    if not isinstance(data, pd.DataFrame) or data.empty:
        return False
    required_columns = ["time", "price", "volume"]
    return all(col in data.columns for col in required_columns)

# Hàm tùy chỉnh để chuyển đổi dữ liệu
def transform_data(data):
    df = data.copy()
    # Chuyển đổi thời gian
    df["time"] = pd.to_datetime(df["time"])
    # Sắp xếp theo thời gian
    df = df.sort_values("time")
    # Thêm cột giá trị giao dịch
    if "price" in df.columns and "volume" in df.columns:
        df["value"] = df["price"] * df["volume"]
    return df

# Hàm tùy chỉnh để xuất dữ liệu
def export_data(data, ticker):
    file_path = DATA_DIR / f"{ticker}.csv"
    data.to_csv(file_path, index=False)
    print(f"Đã lưu dữ liệu cho {ticker} vào {file_path}")

# Hàm chính để chạy pipeline
def run_custom_pipeline():
    for ticker in TICKERS:
        try:
            # Lấy dữ liệu
            print(f"Đang lấy dữ liệu cho {ticker}...")
            data = fetch_data(ticker)
            
            # Xác thực dữ liệu
            if not validate_data(data):
                print(f"Dữ liệu không hợp lệ cho {ticker}")
                continue
            
            # Chuyển đổi dữ liệu
            transformed_data = transform_data(data)
            
            # Xuất dữ liệu
            export_data(transformed_data, ticker)
            
        except Exception as e:
            print(f"Lỗi khi xử lý {ticker}: {e}")

if __name__ == "__main__":
    run_custom_pipeline()

Thảo luận

Đang tải bình luận...