demo2-代码¶
import requests
API_HOST = "http://localhost:8000"
TOKEN = "970e8f84a06xxxxx25346f0356050ba"
def call_api(method: str, api_path: str, params: any):
"""统一API调用方法"""
url = f"{API_HOST}{api_path}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Token {TOKEN}"
}
try:
response = requests.request(method, url, json=params, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API调用失败: {str(e)}")
return 0
def QC_status(run: str) -> bool:
"""检查指定run的QC完成状态"""
res = call_api("GET", f"/Intermediate/common_fastp/{run}", {})
return bool(res and "run" in res)
def mapping_status(sample_id: str) -> tuple:
"""获取样本mapping状态及有效深度"""
res = call_api("GET", f"/Intermediate/srWGS_mapping/{sample_id}", {})
if res and "sample_id" in res:
return (True, float(res.get('mean_depth', 0)))
return (False, -1.0)
def calling_status(sample_id: str) -> bool:
"""检查样本calling完成状态"""
res = call_api("GET", f"/Intermediate/srWGS_calling/{sample_id}", {})
return bool(res and "sample_id" in res)
import os
import shutil
import logging
from time import sleep
from typing import Tuple
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('file_cleaner.log'),
logging.StreamHandler()
]
)
# 路径配置
BASE_DIR = "/share/home/lxl/08Users/yhfu/04Projects/006_idata_sheep"
INPUT_DIR = os.path.join(BASE_DIR, "data/input")
RESULTS_DIR = os.path.join(BASE_DIR, "anaylysis/work/results")
OK_RESULTS_DIR = os.path.join(BASE_DIR, "anaylysis/ok_results")
# API配置
MAX_RETRIES = 3
RETRY_DELAY = 5
def call_with_retry(func, *args):
"""带重试机制的接口调用"""
for attempt in range(MAX_RETRIES):
try:
result = func(*args)
return result
except Exception as e:
logging.warning(f"接口调用失败(尝试 {attempt+1}/{MAX_RETRIES}): {str(e)}")
sleep(RETRY_DELAY)
raise Exception(f"接口调用失败超过最大重试次数: {func.__name__}")
def safe_remove(path: str):
"""安全删除文件/目录"""
try:
if os.path.isfile(path):
os.remove(path)
logging.info(f"已删除文件: {path}")
elif os.path.isdir(path):
shutil.rmtree(path)
logging.info(f"已删除目录: {path}")
except Exception as e:
logging.error(f"删除操作失败: {path} - {str(e)}")
def move_to_ok(src: str, category: str):
"""移动文件到ok目录"""
dest_dir = os.path.join(OK_RESULTS_DIR, category)
os.makedirs(dest_dir, exist_ok=True)
dest = os.path.join(dest_dir, os.path.basename(src))
try:
shutil.move(src, dest)
logging.info(f"成功移动: {src} → {dest}")
except Exception as e:
logging.error(f"移动失败: {src} → {dest} - {str(e)}")
def process_input():
"""处理原始输入文件"""
for sample in os.listdir(INPUT_DIR):
sample_path = os.path.join(INPUT_DIR, sample)
if not os.path.isdir(sample_path):
continue
# 提取所有run ID
runs = {f.split('_')[0] for f in os.listdir(sample_path)
if f.endswith('.fastq.gz')}
# 检查所有run的QC状态
all_ready = all(
call_with_retry(QC_status, run)
for run in runs
)
if all_ready and runs:
safe_remove(sample_path)
def process_qc():
"""处理QC文件"""
qc_dir = os.path.join(RESULTS_DIR, "QC")
for sample in os.listdir(qc_dir):
sample_path = os.path.join(qc_dir, sample)
# 检查mapping状态
try:
ready, _ = call_with_retry(mapping_status, sample)
if ready:
# 删除gz文件
[safe_remove(os.path.join(sample_path, f))
for f in os.listdir(sample_path) if f.endswith('.gz')]
# 移动目录
move_to_ok(sample_path, "QC")
except Exception as e:
logging.error(f"处理QC样本失败: {sample} - {str(e)}")
def process_mapping():
"""处理mapping文件"""
mapping_dir = os.path.join(RESULTS_DIR, "mapping")
for sample in os.listdir(mapping_dir):
sample_path = os.path.join(mapping_dir, sample)
try:
# 检查calling状态和深度
calling_ready = call_with_retry(calling_status, sample)
_, depth = call_with_retry(mapping_status, sample)
if calling_ready:
if depth >= 20:
move_to_ok(sample_path, "mapping")
else:
safe_remove(sample_path)
except Exception as e:
logging.error(f"处理mapping样本失败: {sample} - {str(e)}")
def process_calling():
"""处理calling文件"""
calling_dir = os.path.join(RESULTS_DIR, "calling")
for sample in os.listdir(calling_dir):
sample_path = os.path.join(calling_dir, sample)
try:
if call_with_retry(calling_status, sample):
move_to_ok(sample_path, "calling")
except Exception as e:
logging.error(f"处理calling样本失败: {sample} - {str(e)}")
def main():
"""主处理流程"""
logging.info("=== 开始文件清理流程 ===")
try:
process_input()
process_qc()
process_mapping()
process_calling()
logging.info("=== 所有处理完成 ===")
except Exception as e:
logging.error(f"主流程异常终止: {str(e)}")
if __name__ == "__main__":
main()
本站总访问量 次
Authors: