高效获取滑动窗口序列(大数据集)

问题描述 投票:0回答:1

我存储的数据集只是DNA序列的坐标。

df:

chr   start    stop   label  
chr1   9000    9100    1  
chr1   8803    8903     1  
chr1   8903    9000     0  

我的目标是通过在每个坐标周围创建一个滑动窗口来捕获上下文序列来扩展原始数据集。

new_df:

chr    start         stop        label  
chr1   9000-5000    9000+5000      1  
chr1   9001-5000    9001+5000      1  
chr1   9002-5000    9002+5000      1  
...
chr1   9100-5000    9100+5000      1  
...
  1. 创建滑动窗口:对于原始数据集中的每个条目,通过在起始坐标和终止坐标两侧创建 5000 个核苷酸的滑动窗口来生成新行。这有效地捕获了每个原始坐标周围的序列上下文。
  2. 扩展数据集:对于每个原始条目,创建新行,其中每行代表 5000 核苷酸上下文窗口中的特定位置。例如,如果原始起始值为 9000,我将生成起始值从 9000-5000 到 9000+5000 的行,并类似地调整停止值。现在每个序列的长度为 10,001 个字符。

使用此功能:

def expand_coordinates(element_locs, context=3):
    # Vectorized expansion of coordinates
    start = element_locs['Start'].astype(int)
    end = element_locs['End'].astype(int)
    
    expanded_data = []
    for idx, row in element_locs.iterrows():
        chr_name = row['Chromosome']
        chr_start = start[idx]
        chr_end = end[idx]
        
        for i in range(chr_start, chr_end + 1):
            expanded_data.append({
                'Chromosome': chr_name,
                'Start': max((i - 1) - context, 0),
                'End': min(i + context, max_sizes[chr_name])
            })
    
    expanded_df = pd.DataFrame(expanded_data)
    return expanded_df
  1. 获取序列:使用 new_df 中的扩展坐标从数据集中获取相应的 DNA 序列。
def get_element_seqs(element_locs, context=3):
    expanded_df = expand_coordinates(element_locs, context=context)
    # Optimize genome fetching
    genome = pysam.Fastafile(ref_genome)
    def fetch_sequences(row):
        return genome.fetch(row['Chromosome'], row['Start'], row['End'])
    # Fetch sequences in a vectorized way
    expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
    return element_seqs
  1. 对序列进行标记化
dataset = Dataset.from_pandas(element_final[['Chromosome', 'sequence', 'label']]) 

dataset = dataset.shuffle(seed=42)
tokenizer = AutoTokenizer.from_pretrained(f"InstaDeepAI/nucleotide-transformer-500m-human-ref")
def tokenize_function(examples):
    outputs = tokenizer.batch_encode_plus(examples["sequence"], return_tensors="pt", truncation=False, padding=False, max_length=80)
    return outputs
    
# Creating tokenized  dataset
tokenized_dataset = dataset.map(
    tokenize_function,
    batched=True, batch_size=2000)

  1. 从令牌中获取嵌入
input_file = f"tokenized_elements/tokenized_{ELEMENT_LABEL}/{filename}.arrow"

# Load input data
d1 = Dataset.from_file(input_file)

def embed_function(examples):
    torch.cuda.empty_cache()
    gc.collect()

    inputs = torch.tensor(examples['input_ids'])  # Convert to tensor
    inputs = inputs.to(device)

    with torch.no_grad():
        outputs = model(input_ids=inputs, output_hidden_states=True)

    # Step 3: Extract the embeddings
    hidden_states = outputs.hidden_states  # List of hidden states from all layers
    embeddings = hidden_states[-1]  # Assuming you want embeddings from the last layer
    averaged_embeddings = torch.mean(embeddings, dim=1)  # Calculate mean along dimension 1 (the dimension with size 86)
    averaged_embeddings = averaged_embeddings.to(torch.float32)  # Ensure float32 data type
    return {'embeddings': averaged_embeddings}

# Map embeddings function to input data
embeddings = d1.map(embed_function, batched=True, batch_size=1550)
embeddings = embeddings.remove_columns(["input_ids", "attention_mask"])

# Save embeddings to disk
output_dir = f"embedded_elements/embeddings_{ELEMENT_LABEL}/{filename}"  # Assuming ELEMENT_LABEL is defined elsewhere
  1. 将嵌入插入 XGBoost

这最终会给我带来巨大的数据集,导致我的代码崩溃(例如,我从 70 万行开始,然后扩展到 10 亿行)。我一直在使用 pandas,所以也许这也是问题所在?另一个问题是我认为我没有使用批处理? 不幸的是,我的代码在步骤 2+3 之间不断崩溃。我认为我需要实现批处理,但我不确定一切将如何工作,因为我最终需要将输出提供给法学硕士。

python pandas dataframe bigdata bioinformatics
1个回答
0
投票

重写

expand_coordinates
函数,因为您的进程在步骤 2 和 3 之间失败。在步骤 3 中,
expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
应替换为
expanded_df.merge(fetch_sequences: pd.DataFrame, ...)
之类的内容,因为合并是矢量化的。 认为将任何函数放入 apply 中都是矢量化方法,这是一种误解!

def expand_coordinates(element_locs: pd.DataFrame, context: int = 3):
    # create a column of ranges (memory efficient since ranges are lazy)
    element_locs['range'] = element_locs.apply(lambda row: range(row['start'], row['stop'] + 1), axis=1)
    # explode is a vectorized operation
    element_locs = element_locs.explode('range')
    
    element_locs['start'] = np.maximum(element_locs['start']-context, 0)
    element_locs['stop'] = np.minimum(element_locs['stop']+context, 5000)  # <-- 5000 is an arbitrary maximum for demo

    return element_locs

我进行了一些负载测试,效果很好(尽管远没有达到您所处理的规模,因为我在我的笔记本电脑上测试它 - Ubuntu 16GB)。结果如下。请注意 - 由于我是通过随机生成器生成数据,因此测试中存在很大的可变性。这里的关键部分是爆炸因子 (

target rows/initial rows
),它仅取决于开始值和停止值(在我的情况下是随机生成的)。

Initial rows: 100
Generation time: 0.0 sec
Target rows: 511,628
Explosion time: 0.1 sec


Initial rows: 1,000
Generation time: 0.0 sec
Target rows: 4,965,974
Explosion time: 0.77 sec


Initial rows: 10,000
Generation time: 0.0 sec
Target rows: 50,074,976
Explosion time: 7.32 sec


Initial rows: 11,000
Generation time: 0.0 sec
Target rows: 54,922,952
Explosion time: 8.1 sec


Initial rows: 12,000
Generation time: 0.0 sec
Target rows: 59,966,220
Explosion time: 8.98 sec


Initial rows: 15,000
Generation time: 0.0 sec
Target rows: 75,115,987
Explosion time: 11.51 sec


Initial rows: 20,000
Generation time: 0.0 sec
Target rows: 100,010,662

Process finished with exit code 137 (interrupted by signal 9:SIGKILL)

就您正在处理的规模而言,PANDAS 绝对是一个坏主意。如果您有权访问相关基础设施,请查看 Spark。

© www.soinside.com 2019 - 2024. All rights reserved.