我存储的数据集只是DNA序列的坐标。
df:
chr start stop label
chr1 9000 9100 1
chr1 8803 8903 1
chr1 8903 9000 0
我的目标是通过在每个坐标周围创建一个滑动窗口来捕获上下文序列来扩展原始数据集。
new_df:
chr start stop label
chr1 9000-5000 9000+5000 1
chr1 9001-5000 9001+5000 1
chr1 9002-5000 9002+5000 1
...
chr1 9100-5000 9100+5000 1
...
使用此功能:
def expand_coordinates(element_locs, context=3):
# Vectorized expansion of coordinates
start = element_locs['Start'].astype(int)
end = element_locs['End'].astype(int)
expanded_data = []
for idx, row in element_locs.iterrows():
chr_name = row['Chromosome']
chr_start = start[idx]
chr_end = end[idx]
for i in range(chr_start, chr_end + 1):
expanded_data.append({
'Chromosome': chr_name,
'Start': max((i - 1) - context, 0),
'End': min(i + context, max_sizes[chr_name])
})
expanded_df = pd.DataFrame(expanded_data)
return expanded_df
def get_element_seqs(element_locs, context=3):
expanded_df = expand_coordinates(element_locs, context=context)
# Optimize genome fetching
genome = pysam.Fastafile(ref_genome)
def fetch_sequences(row):
return genome.fetch(row['Chromosome'], row['Start'], row['End'])
# Fetch sequences in a vectorized way
expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
return element_seqs
dataset = Dataset.from_pandas(element_final[['Chromosome', 'sequence', 'label']])
dataset = dataset.shuffle(seed=42)
tokenizer = AutoTokenizer.from_pretrained(f"InstaDeepAI/nucleotide-transformer-500m-human-ref")
def tokenize_function(examples):
outputs = tokenizer.batch_encode_plus(examples["sequence"], return_tensors="pt", truncation=False, padding=False, max_length=80)
return outputs
# Creating tokenized dataset
tokenized_dataset = dataset.map(
tokenize_function,
batched=True, batch_size=2000)
input_file = f"tokenized_elements/tokenized_{ELEMENT_LABEL}/{filename}.arrow"
# Load input data
d1 = Dataset.from_file(input_file)
def embed_function(examples):
torch.cuda.empty_cache()
gc.collect()
inputs = torch.tensor(examples['input_ids']) # Convert to tensor
inputs = inputs.to(device)
with torch.no_grad():
outputs = model(input_ids=inputs, output_hidden_states=True)
# Step 3: Extract the embeddings
hidden_states = outputs.hidden_states # List of hidden states from all layers
embeddings = hidden_states[-1] # Assuming you want embeddings from the last layer
averaged_embeddings = torch.mean(embeddings, dim=1) # Calculate mean along dimension 1 (the dimension with size 86)
averaged_embeddings = averaged_embeddings.to(torch.float32) # Ensure float32 data type
return {'embeddings': averaged_embeddings}
# Map embeddings function to input data
embeddings = d1.map(embed_function, batched=True, batch_size=1550)
embeddings = embeddings.remove_columns(["input_ids", "attention_mask"])
# Save embeddings to disk
output_dir = f"embedded_elements/embeddings_{ELEMENT_LABEL}/{filename}" # Assuming ELEMENT_LABEL is defined elsewhere
这最终会给我带来巨大的数据集,导致我的代码崩溃(例如,我从 70 万行开始,然后扩展到 10 亿行)。我一直在使用 pandas,所以也许这也是问题所在?另一个问题是我认为我没有使用批处理? 不幸的是,我的代码在步骤 2+3 之间不断崩溃。我认为我需要实现批处理,但我不确定一切将如何工作,因为我最终需要将输出提供给法学硕士。
重写
expand_coordinates
函数,因为您的进程在步骤 2 和 3 之间失败。在步骤 3 中,expanded_df['sequence'] = expanded_df.apply(fetch_sequences, axis=1)
应替换为 expanded_df.merge(fetch_sequences: pd.DataFrame, ...)
之类的内容,因为合并是矢量化的。 认为将任何函数放入 apply 中都是矢量化方法,这是一种误解!
def expand_coordinates(element_locs: pd.DataFrame, context: int = 3):
# create a column of ranges (memory efficient since ranges are lazy)
element_locs['range'] = element_locs.apply(lambda row: range(row['start'], row['stop'] + 1), axis=1)
# explode is a vectorized operation
element_locs = element_locs.explode('range')
element_locs['start'] = np.maximum(element_locs['start']-context, 0)
element_locs['stop'] = np.minimum(element_locs['stop']+context, 5000) # <-- 5000 is an arbitrary maximum for demo
return element_locs
我进行了一些负载测试,效果很好(尽管远没有达到您所处理的规模,因为我在我的笔记本电脑上测试它 - Ubuntu 16GB)。结果如下。请注意 - 由于我是通过随机生成器生成数据,因此测试中存在很大的可变性。这里的关键部分是爆炸因子 (
target rows/initial rows
),它仅取决于开始值和停止值(在我的情况下是随机生成的)。
Initial rows: 100
Generation time: 0.0 sec
Target rows: 511,628
Explosion time: 0.1 sec
Initial rows: 1,000
Generation time: 0.0 sec
Target rows: 4,965,974
Explosion time: 0.77 sec
Initial rows: 10,000
Generation time: 0.0 sec
Target rows: 50,074,976
Explosion time: 7.32 sec
Initial rows: 11,000
Generation time: 0.0 sec
Target rows: 54,922,952
Explosion time: 8.1 sec
Initial rows: 12,000
Generation time: 0.0 sec
Target rows: 59,966,220
Explosion time: 8.98 sec
Initial rows: 15,000
Generation time: 0.0 sec
Target rows: 75,115,987
Explosion time: 11.51 sec
Initial rows: 20,000
Generation time: 0.0 sec
Target rows: 100,010,662
Process finished with exit code 137 (interrupted by signal 9:SIGKILL)
就您正在处理的规模而言,PANDAS 绝对是一个坏主意。如果您有权访问相关基础设施,请查看 Spark。