pub struct Channel {
reader: Mutex<ReadHalf<Box<dyn PtStream>>>,
writer: Mutex<WriteHalf<Box<dyn PtStream>>>,
message_subsystem: MessageSubsystem,
stop_publisher: PublisherPtr<Error>,
receive_task: StoppableTaskPtr,
stopped: AtomicBool,
pub(super) session: SessionWeakPtr,
version: Mutex<Option<Arc<VersionMessage>>>,
pub info: ChannelInfo,
}
Expand description
Async channel for communication between nodes.
Fields§
§reader: Mutex<ReadHalf<Box<dyn PtStream>>>
The reading half of the transport stream
writer: Mutex<WriteHalf<Box<dyn PtStream>>>
The writing half of the transport stream
message_subsystem: MessageSubsystem
The message subsystem instance for this channel
stop_publisher: PublisherPtr<Error>
Publisher listening for stop signal for closing this channel
receive_task: StoppableTaskPtr
Task that is listening for the stop signal
stopped: AtomicBool
A boolean marking if this channel is stopped
session: SessionWeakPtr
Weak pointer to respective session
version: Mutex<Option<Arc<VersionMessage>>>
The version message of the node we are connected to. Some if the version exchange has already occurred, None otherwise.
info: ChannelInfo
Channel debug info
Implementations§
source§impl Channel
impl Channel
sourcepub async fn new(
stream: Box<dyn PtStream>,
resolve_addr: Option<Url>,
connect_addr: Url,
session: SessionWeakPtr,
) -> Arc<Self>
pub async fn new( stream: Box<dyn PtStream>, resolve_addr: Option<Url>, connect_addr: Url, session: SessionWeakPtr, ) -> Arc<Self>
Sets up a new channel. Creates a reader and writer PtStream
and
the message publisher subsystem. Performs a network handshake on the
subsystem dispatchers.
sourceasync fn setup_dispatchers(subsystem: &MessageSubsystem)
async fn setup_dispatchers(subsystem: &MessageSubsystem)
Perform network handshake for message subsystem dispatchers.
sourcepub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>)
pub fn start(self: Arc<Self>, executor: Arc<Executor<'_>>)
Starts the channel. Runs a receive loop to start receiving messages or handles a network failure.
sourcepub async fn stop(&self)
pub async fn stop(&self)
Stops the channel.
Notifies all publishers that the channel has been closed in handle_stop()
.
sourcepub async fn subscribe_stop(&self) -> Result<Subscription<Error>>
pub async fn subscribe_stop(&self) -> Result<Subscription<Error>>
Creates a subscription to a stopped signal. If the channel is stopped then this will return a ChannelStopped error.
pub fn is_stopped(&self) -> bool
sourcepub async fn send<M: Message>(&self, message: &M) -> Result<()>
pub async fn send<M: Message>(&self, message: &M) -> Result<()>
Sends a message across a channel. First it converts the message
into a SerializedMessage
and then calls send_serialized
to send it.
Returns an error if something goes wrong.
sourcepub async fn send_serialized(&self, message: &SerializedMessage) -> Result<()>
pub async fn send_serialized(&self, message: &SerializedMessage) -> Result<()>
Sends the encoded payload of provided SerializedMessage
across the channel.
Calls send_message
that creates a new payload and sends it over the
network transport as a packet. Returns an error if something goes wrong.
sourceasync fn send_message(&self, message: &SerializedMessage) -> Result<()>
async fn send_message(&self, message: &SerializedMessage) -> Result<()>
Sends the encoded payload of provided SerializedMessage
by writing
the data to the channel async stream.
sourcepub async fn read_command<R: AsyncRead + Unpin + Send + Sized>(
&self,
stream: &mut R,
) -> Result<String>
pub async fn read_command<R: AsyncRead + Unpin + Send + Sized>( &self, stream: &mut R, ) -> Result<String>
Returns a decoded Message command. We start by extracting the length from the stream, then allocate the precise buffer for this length using stream.take(). This manual deserialization provides a basic DDOS protection, since it prevents nodes from sending an arbitarily large payload.
sourcepub async fn subscribe_msg<M: Message>(&self) -> Result<MessageSubscription<M>>
pub async fn subscribe_msg<M: Message>(&self) -> Result<MessageSubscription<M>>
Subscribe to a message on the message subsystem.
sourceasync fn handle_stop(self: Arc<Self>, result: Result<()>)
async fn handle_stop(self: Arc<Self>, result: Result<()>)
Handle network errors. Panic if error passes silently, otherwise broadcast the error.
sourceasync fn main_receive_loop(self: Arc<Self>) -> Result<()>
async fn main_receive_loop(self: Arc<Self>) -> Result<()>
Run the receive loop. Start receiving messages or handle network failure.
sourcepub fn address(&self) -> &Url
pub fn address(&self) -> &Url
Returns the relevant socket address for this connection. If this is an outbound connection, the transport-processed resolve_addr will be returned. Otherwise for inbound connections it will default to connect_addr.
sourcepub fn resolve_addr(&self) -> Option<Url>
pub fn resolve_addr(&self) -> Option<Url>
Returns the socket address that has undergone transport processing, if it exists. Returns None otherwise.
sourcepub fn connect_addr(&self) -> &Url
pub fn connect_addr(&self) -> &Url
Return the socket address without transport processing.
sourcepub(crate) async fn set_version(&self, version: Arc<VersionMessage>)
pub(crate) async fn set_version(&self, version: Arc<VersionMessage>)
Set the VersionMessage of the node this channel is connected
to. Called on receiving a version message in ProtocolVersion
.
sourcepub fn message_subsystem(&self) -> &MessageSubsystem
pub fn message_subsystem(&self) -> &MessageSubsystem
Returns the inner MessageSubsystem
reference
fn session(&self) -> Arc<dyn Session>
pub fn session_type_id(&self) -> SessionBitFlag
fn p2p(&self) -> P2pPtr
fn is_eof_error(err: &Error) -> bool
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for Channel
impl !RefUnwindSafe for Channel
impl Send for Channel
impl Sync for Channel
impl Unpin for Channel
impl !UnwindSafe for Channel
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<F, W, T, D> Deserialize<With<T, W>, D> for F
impl<F, W, T, D> Deserialize<With<T, W>, D> for F
§fn deserialize(
&self,
deserializer: &mut D,
) -> Result<With<T, W>, <D as Fallible>::Error>
fn deserialize( &self, deserializer: &mut D, ) -> Result<With<T, W>, <D as Fallible>::Error>
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.