class MuxingServerSocketChannel extends DelegatingServerSocketChannel<ServerSocketChannel>
ServerSocketChannel
) among multiple MuxServerSocketChannel
s.
Accepted SocketChannel
s are demultiplexed based on
DatagramPacketFilter
s and dispatched for acceptance through matching
MuxServerSocketChannel
s.Modifier and Type | Field and Description |
---|---|
private static Selector |
acceptSelector
The
Selector which waits for incoming network connections on all
muxingServerSocketChannels . |
private static Thread |
acceptThread
The
Thread which waits for and accepts incoming network
connections on all muxingServerSocketChannels . |
private static InetAddress |
ANY_LOCAL_ADDRESS
Reference to 0.0.0.0 IPv4 or 0::0 IPv6 address for "wildcard" matching
purposes.
|
private static List<MuxingServerSocketChannel> |
muxingServerSocketChannels
The (global) list of existing MixingServerSocketChannels.
|
private List<MuxServerSocketChannel> |
muxServerSocketChannels
The list of MuxServerSocketChannels created by and delegating to
this instance.
|
private Queue<SocketChannel> |
readQ
The list of
SocketChannel s which have been accepted by this
MuxingServerSocketChannel , are being read from, and have not been
accepted by the DatagramPacketFilter of any
MuxServerSocketChannel yet. |
private Selector |
readSelector
|
private Thread |
readThread
|
private static int |
SELECTOR_SELECT_TIMEOUT
The maximum number of milliseconds to wait in a
Selector.select(long) . |
private static int |
SOCKET_CHANNEL_READ_CAPACITY
The maximum number of
byte s to be read from
SocketChannel s accepted by MuxingServerSocketChannel s in
order to demultiplex (i.e. |
(package private) static int |
SOCKET_CHANNEL_READ_TIMEOUT
The maximum number of milliseconds to wait for an accepted
SocketChannel to provide incoming/readable data before it is
considered abandoned by the client. |
private Object |
syncRoot
The Object which synchronizes the access to the state of this
MuxingServerSocketChannel such as
muxServerSocketChannels and readQ . |
delegateAsSelChImpl
delegate
Constructor and Description |
---|
MuxingServerSocketChannel(ServerSocketChannel delegate)
Initializes a new
MuxingServerSocketChannel instance which is to
share the listening endpoint of a specific ServerSocketChannel
among multiple MuxServerSocketChannel s. |
Modifier and Type | Method and Description |
---|---|
SocketChannel |
accept()
Forwards to
BaseDelegatingServerSocketChannel.delegate . |
private static void |
addMuxingServerSocketChannel(MuxingServerSocketChannel channel)
Adds a specific
MuxingServerSocketChannel to the (global) list of
existing MuxingServerSocketChannel s and schedules acceptance of
incoming network connections on it. |
protected void |
addMuxServerSocketChannel(MuxServerSocketChannel channel)
Adds a specific
MuxServerSocketChannel to the list of
MuxServerSocketChannel s created by and delegating to this
instance. |
private void |
closeAbandonedSocketChannels()
Weed out
SocketChannel s which were classified/filtered into
MuxServerSocketChannel but were not accepted (out of it) for a
long time. |
static void |
closeNoExceptions(Channel channel)
Closes a
Channel and swallows any IOException . |
protected MuxServerSocketChannel |
createMuxServerSocketChannel(DatagramPacketFilter filter)
Initializes a new
MuxServerSocketChannel instance which is to
delegate to this instance and is to demultiplex incoming network
connections and packets using a specific DatagramPacketFilter . |
private boolean |
filterAccept(DatagramPacket p,
SocketChannel channel)
Determines whether any of the
MuxServerSocketChannel s created by
and delegating to this instance demultiplexes (i.e. |
private static MuxingServerSocketChannel |
findMuxingServerSocketChannel(SocketAddress localAddr)
Finds the first open
MuxingServerSocketChannel in the (global)
list of existing MuxingServerSocketChannel s which is bound to a
specific local SocketAddress . |
protected SocketChannel |
implAccept(SocketChannel accepted)
Allows extenders to optionally configure (e.g.
|
protected MuxingServerSocket |
implSocket(ServerSocket socket)
Allows extenders to optionally configure (e.g.
|
private static void |
maybeCloseAcceptSelector()
If
acceptSelector exists and is open, try to close it and do not
throw an IOException. |
protected int |
maybeRead(SocketChannel channel,
ByteBuffer buf)
Attempts to read from a specific
SocketChannel into a specific
ByteBuffer without throwing an IOException if the reading
from the channel fails or there is insufficient room in
buf to write into. |
protected void |
muxServerSocketChannelAdded(MuxServerSocketChannel channel)
Notifies this MixingServerSocketChannel that a specific
MuxServerSocketChannel was added to
muxServerSocketChannels . |
static MuxServerSocketChannel |
openAndBind(Map<String,Object> properties,
SocketAddress endpoint,
int backlog,
DatagramPacketFilter filter)
Opens and binds a new
MuxServerSocketChannel instance. |
private static void |
runInAcceptThread()
Runs in
acceptThread and waits for and accepts incoming network
connections on all muxingServerSocketChannels . |
protected void |
runInReadThread()
Runs in
readThread and reads from all SocketChannel s in
readQ and serves them for demultiplexing to
muxServerSocketChannels . |
private static <T extends SelectableChannel> |
runInSelectorThread(Object syncRoot,
Supplier<Thread> threadSupplier,
Supplier<Selector> selectorSupplier,
int selectionKeyOps,
Iterable<T> channels,
BiPredicate<T,SelectionKey> predicate)
Continually tests a
Predicate on a set of
SelectableChannel s. |
private static void |
scheduleAccept(MuxingServerSocketChannel channel)
Schedules a specific
MuxingServerSocketChannel for acceptance of
incoming network connections in acceptThread . |
protected void |
scheduleRead(SocketChannel channel)
Queues a specific
SocketChannel to be read and demultiplexed into
a MuxServerSocketChannel . |
private boolean |
testRunInReadThreadPredicate(SocketChannel ch,
SelectionKey sk)
|
getFD, getFDVal, kill, translateAndSetInterestOps, translateAndSetReadyOps, translateAndUpdateReadyOps
bind, getLocalAddress, getOption, implCloseSelectableChannel, implConfigureBlocking, isBound, setOption, socket, supportedOptions
bind, open, validOps
blockingLock, configureBlocking, implCloseChannel, isBlocking, isRegistered, keyFor, provider, register
register
begin, close, end, isOpen
private static final InetAddress ANY_LOCAL_ADDRESS
private static Selector acceptSelector
Selector
which waits for incoming network connections on all
muxingServerSocketChannels
.private static Thread acceptThread
Thread
which waits for and accepts incoming network
connections on all muxingServerSocketChannels
.private static final List<MuxingServerSocketChannel> muxingServerSocketChannels
private static final int SELECTOR_SELECT_TIMEOUT
Selector.select(long)
. The timeout should be a precaution though
i.e. (1) it should better not be necessary and (2) it should be long
enough to not unnecessarily hurt the performance of the application.static final int SOCKET_CHANNEL_READ_TIMEOUT
SocketChannel
to provide incoming/readable data before it is
considered abandoned by the client.private static final int SOCKET_CHANNEL_READ_CAPACITY
byte
s to be read from
SocketChannel
s accepted by MuxingServerSocketChannel
s in
order to demultiplex (i.e. filter) them into
MuxServerSocketChannel
s.private final List<MuxServerSocketChannel> muxServerSocketChannels
private final Queue<SocketChannel> readQ
SocketChannel
s which have been accepted by this
MuxingServerSocketChannel
, are being read from, and have not been
accepted by the DatagramPacketFilter
of any
MuxServerSocketChannel
yet.private final Selector readSelector
private Thread readThread
private final Object syncRoot
muxServerSocketChannels
and readQ
.public MuxingServerSocketChannel(ServerSocketChannel delegate) throws IOException
MuxingServerSocketChannel
instance which is to
share the listening endpoint of a specific ServerSocketChannel
among multiple MuxServerSocketChannel
s.delegate
- the ServerSocketChannel
for which the new
instance is to provide listening endpoint sharingIOException
- if an I/O error occursprivate static void addMuxingServerSocketChannel(MuxingServerSocketChannel channel) throws IOException
MuxingServerSocketChannel
to the (global) list of
existing MuxingServerSocketChannel
s and schedules acceptance of
incoming network connections on it.channel
- the MuxingServerSocketChannel
to add to the
(global) list of existing MuxingServerSocketChannel
s and to
schedule for acceptance of incoming network connectionsIOException
- if an I/O error occurspublic static void closeNoExceptions(Channel channel)
Channel
and swallows any IOException
.channel
- the Channel
to closeprivate static MuxingServerSocketChannel findMuxingServerSocketChannel(SocketAddress localAddr)
MuxingServerSocketChannel
in the (global)
list of existing MuxingServerSocketChannel
s which is bound to a
specific local SocketAddress
.localAddr
- the local SocketAddress
on which the bound
MuxingServerSocketChannel
is to be foundMuxingServerSocketChannel
in the (global)
list of existing MuxingServerSocketChannel
s which is bound to the
specified localAddr
or null
private static void maybeCloseAcceptSelector()
acceptSelector
exists and is open, try to close it and do not
throw an IOException.public static MuxServerSocketChannel openAndBind(Map<String,Object> properties, SocketAddress endpoint, int backlog, DatagramPacketFilter filter) throws IOException
MuxServerSocketChannel
instance. If there
are other (existing) MuxServerSocketChannel
open and bound on the
specified listening endpoint
, the new instance will share it with
them.properties
- a Map
of the values to be assigned to
properties of the underlying ServerSocketChannel
which is to
actually listen on the specified endpoint
. If the new instance is
not the first to open and bind the specified endpoint
, the
properties
and their respective values may not be used.endpoint
- the IP and port the new instance is to bind tobacklog
- the requested maximum number of pending incoming
connections to be queued. If the new instance is not the first to open
and bind the specified endpoint
, the value may not be used.filter
- the DatagramPacketFilter
to demultiplex (i.e.
recognize) the content meant for the new instanceMuxServerSocketChannel
instance open and bound on
the specified listening endpoint
IOException
- if an I/O error occursprivate static void runInAcceptThread()
acceptThread
and waits for and accepts incoming network
connections on all muxingServerSocketChannels
.private static <T extends SelectableChannel> void runInSelectorThread(Object syncRoot, Supplier<Thread> threadSupplier, Supplier<Selector> selectorSupplier, int selectionKeyOps, Iterable<T> channels, BiPredicate<T,SelectionKey> predicate)
Predicate
on a set of
SelectableChannel
s.T
- the element type of channels
syncRoot
- the Object
to synchronize the access to
threadSupplier
, selectorSupplier
, channels
, and
predicate
. It should be notified whenever the values supplied by
threadSupplier
, selectorSupplier
, and channels
change.threadSupplier
- the Supplier
which is to supply the
Thread
in which the method is supposed to be running. If the
returned value differs from the Thread
in which the method is
actually running (i.e. Thread.currentThread()
, the method
returns. In other words, threadSupplier
is one of the ways to
break out of the loop implemented by the method. The
threadSupplier
is called on while syncRoot
is acquired.selectorSupplier
- the Supplier
which is to supply the
Selector
on which the method is to await changes in the states of
channels
in order to begin a subsequent loop iteration. If the
returned Selector
is not open, the method returns. In other
words, selectorSupplier
is another way to break out of the loop
implemented by the method. The selectorSupplier
is called on
while syncRoot
is acquired.selectionKeyOps
- the SelectionKey
operation-set bits which
identify the states of channels
whose changes trigger new loop
iterationschannels
- the (set of) SelectableChannel
s on each of which
predicate
is to be continually tested. A loop iteration is
triggered when at least one of channels
has a state identified by
selectionKeyOps
changes.predicate
- the Predicate
which is to be continually tested
on channels
. A loop iteration is triggered when at least one of
channels
has a state identified by selectionKeyOps
changes. BiPredicate.test(Object, Object)
is supplied with an
element of channels
and its (automatically) associated
SelectionKey
in the Selector
returned by
selectorSupplier
. The SelectionKey
is provided in case,
for example, the implementation of predicate
chooses to associate
additional state with the SelectableChannel
(through
SelectionKey.attach(Object)
) available throughout the whole loop.private static void scheduleAccept(MuxingServerSocketChannel channel) throws IOException
MuxingServerSocketChannel
for acceptance of
incoming network connections in acceptThread
.channel
- the MuxingServerSocketChannel
to schedule for
acceptance of incoming network connections in acceptThread
IOException
- if an I/O error occurspublic SocketChannel accept() throws IOException
BaseDelegatingServerSocketChannel.delegate
.accept
in class BaseDelegatingServerSocketChannel<ServerSocketChannel>
IOException
protected void addMuxServerSocketChannel(MuxServerSocketChannel channel)
MuxServerSocketChannel
to the list of
MuxServerSocketChannel
s created by and delegating to this
instance.channel
- the MuxServerSocketChannel
to addprivate void closeAbandonedSocketChannels()
SocketChannel
s which were classified/filtered into
MuxServerSocketChannel
but were not accepted (out of it) for a
long time.protected MuxServerSocketChannel createMuxServerSocketChannel(DatagramPacketFilter filter)
MuxServerSocketChannel
instance which is to
delegate to this instance and is to demultiplex incoming network
connections and packets using a specific DatagramPacketFilter
.filter
- the DatagramPacketFilter
to be used by the new
MuxServerSocketChannel
instance to demultiplex incoming network
connections and packetsMuxServerSocketChannel
instance which delegates to
this instance and demultiplexes incoming network connections and packets
using the specified filter
private boolean filterAccept(DatagramPacket p, SocketChannel channel)
MuxServerSocketChannel
s created by
and delegating to this instance demultiplexes (i.e. recognizes) a
specific SocketChannel
based on a specific DatagramPacket
read from it and will make it available for acceptance.p
- the DatagramPacket
read from channel
which is to
be analyzed by the MuxServerSocketChannel
s created by and
delegating to this instancechannel
- the SocketChannel
from which p
was read
and which is to possibly be demultiplexed into a
MuxServerSocketChannel
true
if one of the MuxServerSocketChannel
s
created by and delegating to this instance demultiplexed the specified
channel
; otherwise, false
protected SocketChannel implAccept(SocketChannel accepted) throws IOException
BaseDelegatingServerSocketChannel.delegate
and
before it is returned by BaseDelegatingServerSocketChannel.accept()
.
Queues a SocketChannel
accepted by this instance for reading so
that it may later on be demultiplexed into a
MuxServerSocketChannel
.implAccept
in class BaseDelegatingServerSocketChannel<ServerSocketChannel>
accepted
- the SocketChannel accepted by delegateBaseDelegatingServerSocketChannel.accept()
(in place of accepted)IOException
- if an I/O error occursprotected MuxingServerSocket implSocket(ServerSocket socket) throws IOException
ServerSocket
of BaseDelegatingServerSocketChannel.delegate
and before it is returned by
BaseDelegatingServerSocketChannel.socket()
.
Associates a MuxingServerSocket
with this
MuxingServerSocketChannel
.implSocket
in class BaseDelegatingServerSocketChannel<ServerSocketChannel>
socket
- the ServerSocket
of delegate
ServerSocket
to be returned by BaseDelegatingServerSocketChannel.socket()
(in
place of socket
)IOException
- if an I/O error occursprotected int maybeRead(SocketChannel channel, ByteBuffer buf)
SocketChannel
into a specific
ByteBuffer
without throwing an IOException
if the reading
from the channel
fails or there is insufficient room in
buf
to write into.channel
- the SocketChannel
to read frombuf
- the ByteBuffer
to write intobyte
s read from channel
and written
into buf
or -1
if channel
has reached the end of
its streamprotected void muxServerSocketChannelAdded(MuxServerSocketChannel channel)
muxServerSocketChannels
.channel
- the added MuxServerSocketChannelprotected void runInReadThread()
readThread
and reads from all SocketChannel
s in
readQ
and serves them for demultiplexing to
muxServerSocketChannels
.protected void scheduleRead(SocketChannel channel)
SocketChannel
to be read and demultiplexed into
a MuxServerSocketChannel
.channel
- the SocketChannel
to queue for reading and
demultiplexingprivate boolean testRunInReadThreadPredicate(SocketChannel ch, SelectionKey sk)
BiPredicate.test(Object, Object)
of the
BiPredicate
utilized by runInReadThread()
. The method is
defined explicitly for the purposes of reducing excessive indentation and
bettering readability. Reads from ch
and attempts to
classify/filter it into a MuxServerSocketChannel
for acceptance.
If ch
has not provided readable data within
SOCKET_CHANNEL_READ_TIMEOUT
, it is forcibly closed.ch
- the SocketChannel
to read from and to classify/filter
into a MuxServerSocketChannel
for acceptancesk
- the SelectionKey
associated with ch
in the
Selector
which awaits changes in the state(s) of ch
true
if ch
is to no longer be tested; otherwise,
false
Copyright © 2018. All rights reserved.