跳转至

demo2-代码

import requests
API_HOST = "http://localhost:8000"
TOKEN = "970e8f84a06xxxxx25346f0356050ba"

def call_api(method: str, api_path: str, params: any):
    """统一API调用方法"""
    url = f"{API_HOST}{api_path}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Token {TOKEN}"
    }
    try:
        response = requests.request(method, url, json=params, headers=headers, timeout=30)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"API调用失败: {str(e)}")
        return 0

def QC_status(run: str) -> bool:
    """检查指定run的QC完成状态"""
    res = call_api("GET", f"/Intermediate/common_fastp/{run}", {})
    return bool(res and "run" in res)

def mapping_status(sample_id: str) -> tuple:
    """获取样本mapping状态及有效深度"""
    res = call_api("GET", f"/Intermediate/srWGS_mapping/{sample_id}", {})
    if res and "sample_id" in res:
        return (True, float(res.get('mean_depth', 0)))
    return (False, -1.0)

def calling_status(sample_id: str) -> bool:
    """检查样本calling完成状态"""
    res = call_api("GET", f"/Intermediate/srWGS_calling/{sample_id}", {})
    return bool(res and "sample_id" in res)
import os
import shutil
import logging
from time import sleep
from typing import Tuple

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('file_cleaner.log'),
        logging.StreamHandler()
    ]
)

# 路径配置
BASE_DIR = "/share/home/lxl/08Users/yhfu/04Projects/006_idata_sheep"
INPUT_DIR = os.path.join(BASE_DIR, "data/input")
RESULTS_DIR = os.path.join(BASE_DIR, "anaylysis/work/results")
OK_RESULTS_DIR = os.path.join(BASE_DIR, "anaylysis/ok_results")

# API配置
MAX_RETRIES = 3
RETRY_DELAY = 5


def call_with_retry(func, *args):
    """带重试机制的接口调用"""
    for attempt in range(MAX_RETRIES):
        try:
            result = func(*args)
            return result
        except Exception as e:
            logging.warning(f"接口调用失败(尝试 {attempt+1}/{MAX_RETRIES}): {str(e)}")
            sleep(RETRY_DELAY)
    raise Exception(f"接口调用失败超过最大重试次数: {func.__name__}")

def safe_remove(path: str):
    """安全删除文件/目录"""
    try:
        if os.path.isfile(path):
            os.remove(path)
            logging.info(f"已删除文件: {path}")
        elif os.path.isdir(path):
            shutil.rmtree(path)
            logging.info(f"已删除目录: {path}")
    except Exception as e:
        logging.error(f"删除操作失败: {path} - {str(e)}")

def move_to_ok(src: str, category: str):
    """移动文件到ok目录"""
    dest_dir = os.path.join(OK_RESULTS_DIR, category)
    os.makedirs(dest_dir, exist_ok=True)
    dest = os.path.join(dest_dir, os.path.basename(src))

    try:
        shutil.move(src, dest)
        logging.info(f"成功移动: {src}{dest}")
    except Exception as e:
        logging.error(f"移动失败: {src}{dest} - {str(e)}")

def process_input():
    """处理原始输入文件"""
    for sample in os.listdir(INPUT_DIR):
        sample_path = os.path.join(INPUT_DIR, sample)
        if not os.path.isdir(sample_path):
            continue

        # 提取所有run ID
        runs = {f.split('_')[0] for f in os.listdir(sample_path) 
               if f.endswith('.fastq.gz')}

        # 检查所有run的QC状态
        all_ready = all(
            call_with_retry(QC_status, run)
            for run in runs
        )

        if all_ready and runs:
            safe_remove(sample_path)

def process_qc():
    """处理QC文件"""
    qc_dir = os.path.join(RESULTS_DIR, "QC")
    for sample in os.listdir(qc_dir):
        sample_path = os.path.join(qc_dir, sample)

        # 检查mapping状态
        try:
            ready, _ = call_with_retry(mapping_status, sample)
            if ready:
                # 删除gz文件
                [safe_remove(os.path.join(sample_path, f)) 
                 for f in os.listdir(sample_path) if f.endswith('.gz')]
                # 移动目录
                move_to_ok(sample_path, "QC")
        except Exception as e:
            logging.error(f"处理QC样本失败: {sample} - {str(e)}")

def process_mapping():
    """处理mapping文件"""
    mapping_dir = os.path.join(RESULTS_DIR, "mapping")
    for sample in os.listdir(mapping_dir):
        sample_path = os.path.join(mapping_dir, sample)

        try:
            # 检查calling状态和深度
            calling_ready = call_with_retry(calling_status, sample)
            _, depth = call_with_retry(mapping_status, sample)

            if calling_ready:
                if depth >= 20:
                    move_to_ok(sample_path, "mapping")
                else:
                    safe_remove(sample_path)
        except Exception as e:
            logging.error(f"处理mapping样本失败: {sample} - {str(e)}")

def process_calling():
    """处理calling文件"""
    calling_dir = os.path.join(RESULTS_DIR, "calling")
    for sample in os.listdir(calling_dir):
        sample_path = os.path.join(calling_dir, sample)

        try:
            if call_with_retry(calling_status, sample):
                move_to_ok(sample_path, "calling")
        except Exception as e:
            logging.error(f"处理calling样本失败: {sample} - {str(e)}")

def main():
    """主处理流程"""
    logging.info("=== 开始文件清理流程 ===")
    try:
        process_input()
        process_qc()
        process_mapping()
        process_calling()
        logging.info("=== 所有处理完成 ===")
    except Exception as e:
        logging.error(f"主流程异常终止: {str(e)}")

if __name__ == "__main__":
    main()
本文阅读量  次
本站总访问量  次
Authors: Wind