Browse Source

use TLV blob code in client+server

master
Erik Zscheile 3 years ago
parent
commit
a0dac1ce4c
4 changed files with 131 additions and 37 deletions
  1. +23
    -17
      client/src/main.rs
  2. +6
    -11
      server/src/main.rs
  3. +18
    -9
      zsittle/src/lib.rs
  4. +84
    -0
      zsittle/src/tokiowrap.rs

+ 23
- 17
client/src/main.rs View File

@ -6,9 +6,8 @@ use std::{
sync::Arc,
};
use structopt::StructOpt;
use zsittle::futures_util::future;
use zsittle::tokio::{
io::{copy, split, stdin as tokio_stdin, stdout as tokio_stdout},
io::{split, stdin as tokio_stdin, stdout as tokio_stdout, AsyncBufReadExt, AsyncWriteExt},
net::TcpStream,
runtime,
};
@ -41,9 +40,11 @@ fn main() -> io::Result<()> {
let domain = options.domain.unwrap_or(options.host);
let mut runtime = runtime::Builder::new()
.basic_scheduler()
.threaded_scheduler()
.enable_io()
.build()?;
let handle = runtime.handle().clone();
let mut config = ClientConfig::new();
if let Some(cafile) = &options.cafile {
let mut pem = BufReader::new(File::open(cafile)?);
@ -56,27 +57,32 @@ fn main() -> io::Result<()> {
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
}
let connector = TlsConnector::from(Arc::new(config));
let domain = DNSNameRef::try_from_ascii_str(&domain)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid dnsname"))?;
let fut = async {
let stream = TcpStream::connect(&addr).await?;
let (mut stdin, mut stdout) = (tokio_stdin(), tokio_stdout());
let domain = DNSNameRef::try_from_ascii_str(&domain)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid dnsname"))?;
let (stdin, mut stdout) = (tokio_stdin(), tokio_stdout());
let stream = connector.connect(domain, stream).await?;
let (mut reader, mut writer) = split(stream);
future::select(
copy(&mut reader, &mut stdout),
copy(&mut stdin, &mut writer),
)
.await
.factor_first()
.0?;
let wtk: zsittle::tokio::task::JoinHandle<Result<(), zsittle::ReadError>> =
handle.spawn(async move {
loop {
let blb: zsittle::Blob = zsittle::read_blob(&mut reader).await?;
stdout.write_all(&blb.data[..]).await?;
stdout.write_all(b"\n").await?;
}
});
let mut inps = zsittle::tokio::io::BufReader::new(stdin).lines();
while let Some(line) = inps.next_line().await? {
zsittle::write_blob(&mut writer, 0, line.as_bytes()).await?;
}
let _ = wtk.await?;
Ok(())
};


+ 6
- 11
server/src/main.rs View File

@ -10,7 +10,7 @@ use std::{
use structopt::StructOpt;
use zsittle::futures_util::{future::TryFutureExt, stream::StreamExt};
use zsittle::tokio::{
io::{split, AsyncBufReadExt, AsyncWriteExt},
io::{split, AsyncWriteExt},
net::TcpListener,
runtime,
};
@ -116,11 +116,7 @@ fn main() -> io::Result<()> {
let keep = if k == origin {
true
} else {
match v.write(msg.as_bytes()).await {
Ok(0) | Err(_) => false,
Ok(n) if n < msg.len() => false,
Ok(_) => true,
}
zsittle::write_blob(&mut v, 0, msg.as_bytes()).await.is_ok()
};
if keep {
new_outputs.insert(k, v);
@ -140,15 +136,14 @@ fn main() -> io::Result<()> {
let fut = async move {
let (reader, writer) = split(acceptor.accept(stream).await?);
let mut reader = zsittle::tokio::io::BufReader::new(reader).lines();
let mut reader = zsittle::tokio::io::BufReader::new(reader);
println!("Connected: {}", peer_addr);
distr_send(&distr_in, peer_addr, DistrInner::Connect(writer));
while let Some(line) = reader.next_line().await? {
if line.is_empty() {
break;
while let Ok(blob) = zsittle::read_blob(&mut reader).await {
if let Ok(line) = std::str::from_utf8(&blob.data[..]) {
distr_send(&distr_in, peer_addr, DistrInner::Message(line.to_string()));
}
distr_send(&distr_in, peer_addr, DistrInner::Message(line));
}
distr_send(&distr_in, peer_addr, DistrInner::Disconnect);
println!("Disconnected: {}", peer_addr);


+ 18
- 9
zsittle/src/lib.rs View File

@ -1,3 +1,5 @@
//! NOTE: we only support blobs with a size up to 2^31 bytes (2 Gibibytes).
pub use {futures_util, serde, tokio, tokio_rustls};
use {
@ -10,6 +12,8 @@ use {
tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
};
mod tokiowrap;
/// typ often corresponds to the used format version
pub type BlobType = u32;
@ -29,15 +33,20 @@ where
Ok(())
}
/// NOTE: we only support blobs with a size up to 2^31 bytes (2 Gibibytes).
pub async fn read_blob<R>(r: &mut R) -> Result<Blob, unsigned_varint::io::ReadError>
pub type ReadError = unsigned_varint::io::ReadError;
pub async fn read_blob<R>(r: &mut R) -> Result<Blob, ReadError>
where
R: AsyncRead + Unpin + futures_io::AsyncRead,
R: AsyncRead + Unpin,
{
use unsigned_varint::aio::read_u32;
let typ = read_u32(&mut *r).await?;
let len = read_u32(&mut *r).await?.try_into().unwrap();
let (typ, len) = {
let mut r2 = tokiowrap::FromAdapter { adapted: &mut *r };
let typ = read_u32(&mut r2).await?;
let len = read_u32(&mut r2).await?.try_into().unwrap();
(typ, len)
};
let mut datac = Vec::with_capacity(len);
datac.resize(len, 0);
@ -49,15 +58,15 @@ where
Ok(Blob { typ, data })
}
pub async fn write_blob<W>(w: &mut W, blob: Blob) -> io::Result<()>
pub async fn write_blob<W>(w: &mut W, blob_typ: BlobType, blob_data: &[u8]) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
let mut z = flate2r::DeflateEncoder::new(&blob.data[..], flate2::Compression::fast());
let mut datac = Vec::with_capacity(blob.data.len());
let mut z = flate2r::DeflateEncoder::new(blob_data, flate2::Compression::fast());
let mut datac = Vec::with_capacity(blob_data.len());
z.read_to_end(&mut datac)?;
write_u32(w, blob.typ).await?;
write_u32(w, blob_typ).await?;
write_u32(w, datac.len().try_into().unwrap()).await?;
w.write_all(&datac[..]).await?;
Ok(())


+ 84
- 0
zsittle/src/tokiowrap.rs View File

@ -0,0 +1,84 @@
// original source: https://gist.githubusercontent.com/lolgesten/7f350e8c7e321c0dd2df2558173e3ad8/raw/7540da974c068434a4aa6462b5d365367d61dc74/tokio.rs
#![allow(dead_code)]
use futures_io::{AsyncRead, AsyncWrite};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead as TokioAsyncRead;
use tokio::io::AsyncWrite as TokioAsyncWrite;
pub trait Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
pub fn from_tokio<Z>(adapted: Z) -> impl Stream
where
Z: TokioAsyncRead + TokioAsyncWrite + Unpin + Send + 'static,
{
FromAdapter { adapted }
}
pub struct FromAdapter<Z> {
pub(crate) adapted: Z,
}
impl<Z: TokioAsyncRead + Unpin> AsyncRead for FromAdapter<Z> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.get_mut().adapted).poll_read(cx, buf)
}
}
impl<Z: TokioAsyncWrite + Unpin> AsyncWrite for FromAdapter<Z> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.get_mut().adapted).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().adapted).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().adapted).poll_shutdown(cx)
}
}
impl<Z: TokioAsyncRead + TokioAsyncWrite + Unpin + Send + 'static> Stream for FromAdapter<Z> {}
pub fn to_tokio<S: Stream>(adapted: S) -> TokioStream<S> {
TokioStream { adapted }
}
pub struct TokioStream<S> {
adapted: S,
}
impl<S: Stream> TokioAsyncRead for TokioStream<S> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.get_mut().adapted).poll_read(cx, buf)
}
}
impl<S: Stream> TokioAsyncWrite for TokioStream<S> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.get_mut().adapted).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().adapted).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.get_mut().adapted).poll_close(cx)
}
}

Loading…
Cancel
Save