mirror of
https://github.com/Drop-OSS/droplet.git
synced 2025-11-10 04:22:16 +10:00
feat: work on version backend system
This commit is contained in:
8
.github/workflows/CI.yml
vendored
8
.github/workflows/CI.yml
vendored
@ -12,12 +12,8 @@ permissions:
|
||||
- main
|
||||
tags-ignore:
|
||||
- "**"
|
||||
paths-ignore:
|
||||
- "**/*.md"
|
||||
- LICENSE
|
||||
- "**/*.gitignore"
|
||||
- .editorconfig
|
||||
- docs/**
|
||||
paths:
|
||||
- package.json
|
||||
pull_request: null
|
||||
jobs:
|
||||
build:
|
||||
|
||||
@ -24,6 +24,7 @@ webpki = "0.22.4"
|
||||
ring = "0.17.14"
|
||||
tokio = { version = "1.45.1", features = ["fs", "io-util"] }
|
||||
tokio-util = { version = "0.7.15", features = ["codec"] }
|
||||
zip = "4.2.0"
|
||||
|
||||
[dependencies.x509-parser]
|
||||
version = "0.17.0"
|
||||
|
||||
@ -52,4 +52,38 @@ test("numerous small file", async (t) => {
|
||||
}
|
||||
|
||||
fs.rmSync(dirName, { recursive: true });
|
||||
});
|
||||
});
|
||||
|
||||
test.skip("performance test", async (t) => {
|
||||
t.timeout(5 * 60 * 1000);
|
||||
const dirName = "./.test/pt";
|
||||
if (fs.existsSync(dirName)) fs.rmSync(dirName, { recursive: true });
|
||||
fs.mkdirSync(dirName, { recursive: true });
|
||||
|
||||
const fileSize = 1 * 1000 * 1000 * 1000; // 1GB
|
||||
|
||||
const randomStream = fs.createReadStream("/dev/random", {
|
||||
start: 0,
|
||||
end: fileSize,
|
||||
});
|
||||
const outputStream = fs.createWriteStream(path.join(dirName, "file.bin"));
|
||||
await new Promise((r) => {
|
||||
randomStream.pipe(outputStream);
|
||||
randomStream.on("end", r);
|
||||
});
|
||||
|
||||
const start = Date.now();
|
||||
await new Promise((r, e) =>
|
||||
generateManifest(
|
||||
dirName,
|
||||
(_, __) => {},
|
||||
(_, __) => {},
|
||||
(err, manifest) => (err ? e(err) : r(manifest))
|
||||
)
|
||||
);
|
||||
const end = Date.now();
|
||||
|
||||
t.pass(`Took ${end - start}ms to process ${fileSize / (1000 * 1000)}MB`);
|
||||
|
||||
fs.rmSync(dirName, { recursive: true });
|
||||
});
|
||||
|
||||
@ -52,7 +52,6 @@ test("read file offset", async (t) => {
|
||||
fs.mkdirSync(dirName, { recursive: true });
|
||||
|
||||
const testString = "0123456789";
|
||||
|
||||
fs.writeFileSync(dirName + "/TESTFILE", testString);
|
||||
|
||||
const stream = droplet.readFile(dirName, "TESTFILE", 1, 4);
|
||||
@ -64,7 +63,7 @@ test("read file offset", async (t) => {
|
||||
finalString += String.fromCharCode.apply(null, chunk);
|
||||
}
|
||||
|
||||
const expectedString = testString.slice(1, 5);
|
||||
const expectedString = testString.slice(1, 4);
|
||||
|
||||
t.assert(finalString == expectedString, "file strings don't match");
|
||||
fs.rmSync(dirName, { recursive: true });
|
||||
|
||||
@ -1,181 +0,0 @@
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::{
|
||||
fs::{self, metadata, File},
|
||||
io::{self, BufReader, ErrorKind, Read, Seek},
|
||||
path::{Path, PathBuf},
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use napi::{
|
||||
bindgen_prelude::*,
|
||||
tokio_stream::{Stream, StreamExt},
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, Take};
|
||||
use tokio_util::{
|
||||
bytes::BytesMut,
|
||||
codec::{BytesCodec, FramedRead},
|
||||
};
|
||||
|
||||
fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
|
||||
if metadata(path).unwrap().is_dir() {
|
||||
let paths = fs::read_dir(path).unwrap();
|
||||
for path_result in paths {
|
||||
let full_path = path_result.unwrap().path();
|
||||
if metadata(&full_path).unwrap().is_dir() {
|
||||
_list_files(vec, &full_path);
|
||||
} else {
|
||||
vec.push(full_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VersionFile {
|
||||
pub relative_filename: String,
|
||||
pub permission: u32,
|
||||
}
|
||||
|
||||
pub trait VersionBackend: 'static {
|
||||
fn list_files(&self, path: &Path) -> Vec<VersionFile>;
|
||||
fn reader(&self, file: &VersionFile) -> Option<File>;
|
||||
}
|
||||
|
||||
pub struct PathVersionBackend {
|
||||
pub base_dir: PathBuf,
|
||||
}
|
||||
impl VersionBackend for PathVersionBackend {
|
||||
fn list_files(&self, path: &Path) -> Vec<VersionFile> {
|
||||
let mut vec = Vec::new();
|
||||
_list_files(&mut vec, path);
|
||||
|
||||
let mut results = Vec::new();
|
||||
|
||||
for pathbuf in vec.iter() {
|
||||
let file = File::open(pathbuf.clone()).unwrap();
|
||||
let relative = pathbuf.strip_prefix(path).unwrap();
|
||||
let metadata = file.try_clone().unwrap().metadata().unwrap();
|
||||
let permission_object = metadata.permissions();
|
||||
let permissions = {
|
||||
let perm: u32;
|
||||
#[cfg(target_family = "unix")]
|
||||
{
|
||||
perm = permission_object.mode();
|
||||
}
|
||||
#[cfg(not(target_family = "unix"))]
|
||||
{
|
||||
perm = 0
|
||||
}
|
||||
perm
|
||||
};
|
||||
|
||||
results.push(VersionFile {
|
||||
relative_filename: relative.to_string_lossy().to_string(),
|
||||
permission: permissions,
|
||||
});
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
fn reader(&self, file: &VersionFile) -> Option<File> {
|
||||
let file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
|
||||
|
||||
return Some(file);
|
||||
}
|
||||
}
|
||||
|
||||
// Todo implementation for archives
|
||||
// Split into a separate impl for each type of archive
|
||||
pub struct ArchiveVersionBackend {}
|
||||
impl VersionBackend for ArchiveVersionBackend {
|
||||
fn list_files(&self, path: &Path) -> Vec<VersionFile> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn reader(&self, file: &VersionFile) -> Option<File> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_backend_for_path(path: &Path) -> Option<Box<(dyn VersionBackend)>> {
|
||||
let is_directory = path.is_dir();
|
||||
if is_directory {
|
||||
return Some(Box::new(PathVersionBackend {
|
||||
base_dir: path.to_path_buf(),
|
||||
}));
|
||||
};
|
||||
|
||||
/*
|
||||
Insert checks for whatever backend you like
|
||||
*/
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn has_backend_for_path(path: String) -> bool {
|
||||
let path = Path::new(&path);
|
||||
|
||||
let has_backend = create_backend_for_path(path).is_some();
|
||||
|
||||
has_backend
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn list_files(path: String) -> Vec<String> {
|
||||
let path = Path::new(&path);
|
||||
let backend = create_backend_for_path(path).unwrap();
|
||||
let files = backend.list_files(path);
|
||||
files.into_iter().map(|e| e.relative_filename).collect()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn read_file(
|
||||
path: String,
|
||||
sub_path: String,
|
||||
env: &Env,
|
||||
start: Option<u32>,
|
||||
end: Option<u32>
|
||||
) -> Option<ReadableStream<'static, BufferSlice<'static>>> {
|
||||
let path = Path::new(&path);
|
||||
let backend = create_backend_for_path(path).unwrap();
|
||||
let version_file = VersionFile {
|
||||
relative_filename: sub_path,
|
||||
permission: 0, // Shouldn't matter
|
||||
};
|
||||
// Use `?` operator for cleaner error propagation from `Option`
|
||||
let mut reader = backend.reader(&version_file)?;
|
||||
|
||||
// Can't do this in tokio because it requires a .await, which we can't do here
|
||||
if let Some(start) = start {
|
||||
reader.seek(io::SeekFrom::Start(start as u64)).unwrap();
|
||||
}
|
||||
|
||||
// Convert std::fs::File to tokio::fs::File for async operations
|
||||
let reader = tokio::fs::File::from_std(reader);
|
||||
|
||||
|
||||
let boxed_reader: Box<dyn AsyncRead + Send + Unpin> = match end {
|
||||
Some(end_val) => Box::new(reader.take(end_val as u64)),
|
||||
None => Box::new(reader),
|
||||
};
|
||||
|
||||
|
||||
|
||||
// Create a FramedRead stream with BytesCodec for chunking
|
||||
|
||||
let stream = FramedRead::new(boxed_reader, BytesCodec::new())
|
||||
// Use StreamExt::map to transform each Result item
|
||||
.map(|result_item| {
|
||||
result_item
|
||||
// Apply Result::map to transform Ok(BytesMut) to Ok(Vec<u8>)
|
||||
.map(|bytes| bytes.to_vec())
|
||||
// Apply Result::map_err to transform Err(std::io::Error) to Err(napi::Error)
|
||||
.map_err(|e| napi::Error::from(e)) // napi::Error implements From<tokio::io::Error>
|
||||
});
|
||||
// Create the napi-rs ReadableStream from the tokio_stream::Stream
|
||||
// The unwrap() here means if stream creation fails, it will panic.
|
||||
// For a production system, consider returning Result<Option<...>> and handling this.
|
||||
Some(ReadableStream::create_with_stream_bytes(env, stream).unwrap())
|
||||
}
|
||||
@ -1,8 +1,9 @@
|
||||
#![deny(clippy::all)]
|
||||
#![feature(trait_alias)]
|
||||
|
||||
pub mod file_utils;
|
||||
pub mod manifest;
|
||||
pub mod ssl;
|
||||
pub mod version;
|
||||
|
||||
#[macro_use]
|
||||
extern crate napi_derive;
|
||||
|
||||
@ -1,19 +1,20 @@
|
||||
use std::{
|
||||
collections::HashMap, fs::File, io::{BufRead, BufReader}, path::Path, rc::Rc, sync::Arc, thread
|
||||
collections::HashMap,
|
||||
io::{BufRead, BufReader},
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
thread,
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
use napi::{
|
||||
bindgen_prelude::Function,
|
||||
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
|
||||
Env, Error, Result,
|
||||
Result,
|
||||
};
|
||||
use serde_json::json;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::file_utils::create_backend_for_path;
|
||||
use crate::version::utils::create_backend_for_path;
|
||||
|
||||
|
||||
const CHUNK_SIZE: usize = 1024 * 1024 * 64;
|
||||
|
||||
@ -43,8 +44,8 @@ pub fn generate_manifest(
|
||||
) -> Result<(), String> {
|
||||
thread::spawn(move || {
|
||||
let base_dir = Path::new(&dir);
|
||||
let backend = create_backend_for_path(base_dir).unwrap();
|
||||
let files = backend.list_files(base_dir);
|
||||
let mut backend = create_backend_for_path(base_dir).unwrap();
|
||||
let files = backend.list_files();
|
||||
|
||||
// Filepath to chunk data
|
||||
let mut chunks: HashMap<String, ChunkData> = HashMap::new();
|
||||
@ -53,7 +54,7 @@ pub fn generate_manifest(
|
||||
let mut i: i32 = 0;
|
||||
|
||||
for version_file in files {
|
||||
let mut raw_reader= backend.reader(&version_file).unwrap();
|
||||
let raw_reader= backend.reader(&version_file).unwrap();
|
||||
let mut reader = BufReader::with_capacity(CHUNK_SIZE, raw_reader);
|
||||
|
||||
let mut chunk_data = ChunkData {
|
||||
@ -105,4 +106,4 @@ pub fn generate_manifest(
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
107
src/version/backends.rs
Normal file
107
src/version/backends.rs
Normal file
@ -0,0 +1,107 @@
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{self, Read},
|
||||
path::PathBuf,
|
||||
};
|
||||
use zip::{read::ZipFile, ZipArchive};
|
||||
|
||||
use crate::version::{
|
||||
types::{MinimumFileObject, Skippable, VersionBackend, VersionFile},
|
||||
utils::_list_files,
|
||||
};
|
||||
|
||||
pub struct PathVersionBackend {
|
||||
pub base_dir: PathBuf,
|
||||
}
|
||||
impl VersionBackend for PathVersionBackend {
|
||||
fn list_files(&mut self) -> Vec<VersionFile> {
|
||||
let mut vec = Vec::new();
|
||||
_list_files(&mut vec, &self.base_dir);
|
||||
|
||||
let mut results = Vec::new();
|
||||
|
||||
for pathbuf in vec.iter() {
|
||||
let file = File::open(pathbuf.clone()).unwrap();
|
||||
let relative = pathbuf.strip_prefix(self.base_dir.clone()).unwrap();
|
||||
let metadata = file.try_clone().unwrap().metadata().unwrap();
|
||||
let permission_object = metadata.permissions();
|
||||
let permissions = {
|
||||
let perm: u32;
|
||||
#[cfg(target_family = "unix")]
|
||||
{
|
||||
perm = permission_object.mode();
|
||||
}
|
||||
#[cfg(not(target_family = "unix"))]
|
||||
{
|
||||
perm = 0
|
||||
}
|
||||
perm
|
||||
};
|
||||
|
||||
results.push(VersionFile {
|
||||
relative_filename: relative.to_string_lossy().to_string(),
|
||||
permission: permissions,
|
||||
});
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
fn reader(&mut self, file: &VersionFile) -> Option<Box<(dyn MinimumFileObject + 'static)>> {
|
||||
let file = File::open(self.base_dir.join(file.relative_filename.clone())).ok()?;
|
||||
|
||||
return Some(Box::new(file));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ZipVersionBackend {
|
||||
archive: ZipArchive<File>,
|
||||
}
|
||||
impl ZipVersionBackend {
|
||||
pub fn new(archive: PathBuf) -> Self {
|
||||
let handle = File::open(archive).unwrap();
|
||||
Self {
|
||||
archive: ZipArchive::new(handle).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ZipFileWrapper<'a> {
|
||||
inner: ZipFile<'a, File>,
|
||||
}
|
||||
|
||||
impl Read for ZipFileWrapper<'_> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
self.inner.read(buf)
|
||||
}
|
||||
}
|
||||
impl Skippable for ZipFileWrapper<'_> {
|
||||
fn skip(&mut self, amount: u64) {
|
||||
io::copy(&mut self.inner.by_ref().take(amount), &mut io::sink()).unwrap();
|
||||
}
|
||||
}
|
||||
impl MinimumFileObject for ZipFileWrapper<'_> {}
|
||||
|
||||
impl VersionBackend for ZipVersionBackend {
|
||||
fn list_files(&mut self) -> Vec<VersionFile> {
|
||||
let mut results = Vec::new();
|
||||
for i in 0..self.archive.len() {
|
||||
let entry = self.archive.by_index(i).unwrap();
|
||||
results.push(VersionFile {
|
||||
relative_filename: entry.name().to_owned(),
|
||||
permission: entry.unix_mode().or(Some(0)).unwrap(),
|
||||
});
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
fn reader(&mut self, file: &VersionFile) -> Option<Box<(dyn MinimumFileObject)>> {
|
||||
let file = self.archive.by_name(&file.relative_filename).ok()?;
|
||||
let zip_file_wrapper = ZipFileWrapper { inner: file };
|
||||
|
||||
//Some(Box::new(zip_file_wrapper))
|
||||
None
|
||||
}
|
||||
}
|
||||
3
src/version/mod.rs
Normal file
3
src/version/mod.rs
Normal file
@ -0,0 +1,3 @@
|
||||
pub mod utils;
|
||||
pub mod types;
|
||||
pub mod backends;
|
||||
47
src/version/types.rs
Normal file
47
src/version/types.rs
Normal file
@ -0,0 +1,47 @@
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
|
||||
use tokio::io::{self, AsyncRead};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VersionFile {
|
||||
pub relative_filename: String,
|
||||
pub permission: u32,
|
||||
}
|
||||
|
||||
pub trait Skippable {
|
||||
fn skip(&mut self, amount: u64);
|
||||
}
|
||||
impl<T> Skippable for T
|
||||
where
|
||||
T: Seek,
|
||||
{
|
||||
fn skip(&mut self, amount: u64) {
|
||||
self.seek(SeekFrom::Start(amount)).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub trait MinimumFileObject: Read + Send + Skippable {}
|
||||
impl<T: Read + Send + Seek> MinimumFileObject for T {}
|
||||
|
||||
// Intentionally not a generic, because of types in read_file
|
||||
pub struct ReadToAsyncRead {
|
||||
pub inner: Box<(dyn Read + Send)>,
|
||||
}
|
||||
|
||||
impl AsyncRead for ReadToAsyncRead {
|
||||
fn poll_read(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<io::Result<()>> {
|
||||
let mut read_buf = [0u8; 8192];
|
||||
let amount = self.inner.read(&mut read_buf).unwrap();
|
||||
buf.put_slice(&read_buf[0..amount]);
|
||||
std::task::Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait VersionBackend {
|
||||
fn list_files(&mut self) -> Vec<VersionFile>;
|
||||
fn reader(&mut self, file: &VersionFile) -> Option<Box<(dyn MinimumFileObject)>>;
|
||||
}
|
||||
112
src/version/utils.rs
Normal file
112
src/version/utils.rs
Normal file
@ -0,0 +1,112 @@
|
||||
use std::{
|
||||
fs::{self, metadata, File},
|
||||
io::Read,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use napi::{bindgen_prelude::*, tokio_stream::StreamExt};
|
||||
use tokio_util::codec::{BytesCodec, FramedRead};
|
||||
use zip::ZipArchive;
|
||||
|
||||
use crate::version::{
|
||||
backends::{PathVersionBackend, ZipVersionBackend},
|
||||
types::{MinimumFileObject, ReadToAsyncRead, VersionBackend, VersionFile},
|
||||
};
|
||||
|
||||
pub fn _list_files(vec: &mut Vec<PathBuf>, path: &Path) {
|
||||
if metadata(path).unwrap().is_dir() {
|
||||
let paths = fs::read_dir(path).unwrap();
|
||||
for path_result in paths {
|
||||
let full_path = path_result.unwrap().path();
|
||||
if metadata(&full_path).unwrap().is_dir() {
|
||||
_list_files(vec, &full_path);
|
||||
} else {
|
||||
vec.push(full_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_backend_for_path(path: &Path) -> Option<Box<(dyn VersionBackend + Send)>> {
|
||||
let is_directory = path.is_dir();
|
||||
if is_directory {
|
||||
return Some(Box::new(PathVersionBackend {
|
||||
base_dir: path.to_path_buf(),
|
||||
}));
|
||||
};
|
||||
|
||||
/*
|
||||
Insert checks for whatever backend you like
|
||||
*/
|
||||
|
||||
if path.ends_with(".zip") {
|
||||
return Some(Box::new(ZipVersionBackend::new(path.to_path_buf())));
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn has_backend_for_path(path: String) -> bool {
|
||||
let path = Path::new(&path);
|
||||
|
||||
let has_backend = create_backend_for_path(path).is_some();
|
||||
|
||||
has_backend
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn list_files(path: String) -> Vec<String> {
|
||||
let path = Path::new(&path);
|
||||
let mut backend = create_backend_for_path(path).unwrap();
|
||||
let files = backend.list_files();
|
||||
files.into_iter().map(|e| e.relative_filename).collect()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn read_file(
|
||||
path: String,
|
||||
sub_path: String,
|
||||
env: &Env,
|
||||
start: Option<u32>,
|
||||
end: Option<u32>,
|
||||
) -> Option<ReadableStream<'static, BufferSlice<'static>>> {
|
||||
let path = Path::new(&path);
|
||||
let mut backend = create_backend_for_path(path).unwrap();
|
||||
let version_file = VersionFile {
|
||||
relative_filename: sub_path,
|
||||
permission: 0, // Shouldn't matter
|
||||
};
|
||||
// Use `?` operator for cleaner error propagation from `Option`
|
||||
let mut reader = backend.reader(&version_file)?;
|
||||
|
||||
// Skip the 'start' amount of bytes without seek
|
||||
if let Some(skip) = start {
|
||||
reader.skip(skip.into());
|
||||
// io::copy(&mut reader.by_ref().take(skip.into()), &mut io::sink()).unwrap();
|
||||
}
|
||||
|
||||
let async_reader = if let Some(limit) = end {
|
||||
let amount = limit - start.or(Some(0)).unwrap();
|
||||
ReadToAsyncRead {
|
||||
inner: Box::new(reader.take(amount.into())),
|
||||
}
|
||||
} else {
|
||||
ReadToAsyncRead { inner: reader }
|
||||
};
|
||||
|
||||
// Create a FramedRead stream with BytesCodec for chunking
|
||||
let stream = FramedRead::new(async_reader, BytesCodec::new())
|
||||
// Use StreamExt::map to transform each Result item
|
||||
.map(|result_item| {
|
||||
result_item
|
||||
// Apply Result::map to transform Ok(BytesMut) to Ok(Vec<u8>)
|
||||
.map(|bytes| bytes.to_vec())
|
||||
// Apply Result::map_err to transform Err(std::io::Error) to Err(napi::Error)
|
||||
.map_err(|e| napi::Error::from(e)) // napi::Error implements From<tokio::io::Error>
|
||||
});
|
||||
// Create the napi-rs ReadableStream from the tokio_stream::Stream
|
||||
// The unwrap() here means if stream creation fails, it will panic.
|
||||
// For a production system, consider returning Result<Option<...>> and handling this.
|
||||
Some(ReadableStream::create_with_stream_bytes(env, stream).unwrap())
|
||||
}
|
||||
Reference in New Issue
Block a user