效果好不好,试一试就知道了。
比 Python 快;
兼容 NumPy;
可以进行数据并行多线程处理;
与执行数据并行多线程的所有其他包兼容;
安全。
[build-system]requires = ["maturin==0.12.5"]build-backend = "maturin"
[dependencies]thiserror = "1.0.30"ndarray-npy = { version = "0.8.1", default-features = false }rayon = "1.5.1"numpy = "0.15.0"ndarray = { version = "0.15.4", features = ["approx", "rayon"] }pyo3 = { version = "0.15.1", features = ["extension-module"] }[dev-dependencies]temp_testdir = "0.2.3"
mod python_module;mod tests;
#[pyfn(m)]#[pyo3(name = "read_f64")]fn read_f64_py(_py: Python<'_>,filename: &str,iid_count: usize,sid_count: usize,count_a1: bool,iid_index: &PyArray1<usize>,sid_index: &PyArray1<usize>,val: &PyArray2<f64>,num_threads: usize,) -> Result<(), PyErr> {let iid_index = iid_index.readonly();let sid_index = sid_index.readonly();let mut val = unsafe { val.as_array_mut() };let ii = &iid_index.as_slice()?;let si = &sid_index.as_slice()?;create_pool(num_threads)?.install(|| {read_no_alloc(filename,iid_count,sid_count,count_a1,ii,si,f64::NAN,&mut val,)})?;Ok(())}
let iid_index = iid_index.readonly();let ii = &iid_index.as_slice()?;
let mut val = unsafe { val.as_array_mut() };
from .bed_reader import [...] read_f64 [...]
def read([...]):[...]val = np.zeros((len(iid_index), len(sid_index)), order=order, dtype=dtype)[...]reader = read_f64[...]reader(str(self.filepath),iid_count=self.iid_count,sid_count=self.sid_count,count_a1=self.count_A1,iid_index=iid_index,sid_index=sid_index,val=val,num_threads=num_threads,)[...]return val
let mut buf_reader = BufReader::new(File::open(filename)?);
use thiserror::Error;.../// BedErrorPlus enumerates all possible errors/// returned by this library./// Based on https://nick.groenen.me/posts/rust-error-handling/#the-library-error-type#[derive(Error, Debug)]pub enum BedErrorPlus {#[error(transparent)]IOError(#[from] std::io::Error),#[error(transparent)]BedError(#[from] BedError),#[error(transparent)]ThreadPoolError(#[from] ThreadPoolBuildError),}
impl std::convert::From<BedErrorPlus> for PyErr {fn from(err: BedErrorPlus) -> PyErr {match err {BedErrorPlus::IOError(_) => PyIOError::new_err(err.to_string()),BedErrorPlus::ThreadPoolError(_) => PyValueError::new_err(err.to_string()),BedErrorPlus::BedError(BedError::IidIndexTooBig(_))| BedErrorPlus::BedError(BedError::SidIndexTooBig(_))| BedErrorPlus::BedError(BedError::IndexMismatch(_, _, _, _))| BedErrorPlus::BedError(BedError::IndexesTooBigForFiles(_, _))| BedErrorPlus::BedError(BedError::SubsetMismatch(_, _, _, _)) => {PyIndexError::new_err(err.to_string())}_ => PyValueError::new_err(err.to_string()),}}}
if (BED_FILE_MAGIC1 != bytes_vector[0]) || (BED_FILE_MAGIC2 != bytes_vector[1]) {return Err(BedError::IllFormed(filename.to_string()).into());}
use thiserror::Error;[...]// https://docs.rs/thiserror/1.0.23/thiserror/#[derive(Error, Debug, Clone)]pub enum BedError {#[error("Ill-formed BED file. BED file header is incorrect or length is wrong.'{0}'")]IllFormed(String),[...]}
DNA 位置的二进制数据;
输出数组的列。
[... not shown, read bytes for DNA location's data ...]// Zip in the column of the output array.zip(out_val.axis_iter_mut(nd::Axis(1)))// In parallel, decompress the iid info and put it in its column.par_bridge() // This seems faster that parallel zip.try_for_each(|(bytes_vector_result, mut col)| {match bytes_vector_result {Err(e) => Err(e),Ok(bytes_vector) => {for out_iid_i in 0..out_iid_count {let in_iid_i = iid_index[out_iid_i];let i_div_4 = in_iid_i / 4;let i_mod_4 = in_iid_i % 4;let genotype_byte: u8 = (bytes_vector[i_div_4] >> (i_mod_4 * 2)) & 0x03;col[out_iid_i] = from_two_bits_to_value[genotype_byte as usize];}Ok(())}}})?;
[...]let mut result_list: Vec<Result<(), BedError>> = vec![Ok(()); sid_count];nd::par_azip!((mut stats_row in stats.axis_iter_mut(nd::Axis(0)),&n_observed in &n_observed_array,&sum_s in &sum_s_array,&sum2_s in &sum2_s_array,result_ptr in &mut result_list){[...some code not shown...]});// Check the result list for errorsresult_list.par_iter().try_for_each(|x| (*x).clone())?;[...]
def get_num_threads(num_threads=None):if num_threads is not None:return num_threadsif "PST_NUM_THREADS" in os.environ:return int(os.environ["PST_NUM_THREADS"])if "NUM_THREADS" in os.environ:return int(os.environ["NUM_THREADS"])if "MKL_NUM_THREADS" in os.environ:return int(os.environ["MKL_NUM_THREADS"])return multiprocessing.cpu_count()
pub fn create_pool(num_threads: usize) -> Result<rayon::ThreadPool, BedErrorPlus> {match rayon::ThreadPoolBuilder::new().num_threads(num_threads).build(){Err(e) => Err(e.into()),Ok(pool) => Ok(pool),}}
[]create_pool(num_threads)?.install(|| {read_no_alloc(filename,[])})?;[]
def read([...]dtype: Optional[Union[type, str]] = "float32",[...])[...]if dtype == np.int8:reader = read_i8elif dtype == np.float64:reader = read_f64elif dtype == np.float32:reader = read_f32else:raise ValueError(known, only""'int8', 'float32', and 'float64' are allowed.")reader(str(self.filepath),[...])
#[pyfn(m)]#[pyo3(name = "read_f64")]fn read_f64_py([...]val: &PyArray2<f64>,num_threads: usize,-> Result<(), PyErr> {[...]let mut val = unsafe { val.as_array_mut() };[...]read_no_alloc([...]f64::NAN,val,)[...]}
fn read_no_alloc<TOut: Copy + Default + From<i8> + Debug + Sync + Send>(filename: &str,[...]missing_value: TOut,val: &mut nd::ArrayViewMut2<'_, TOut>,-> Result<(), BedErrorPlus> {[...]}
© THE END
转载请联系本公众号获得授权
投稿或寻求报道:content@jiqizhixin.com