Continuous socket connection can be crucial to ensure correct app behavior. Whether it’s delivering real-time chat updates, stock prices, or in-app indicators, a reliable connection is vital.
One of the irritating problems with sockets is a sudden loss of connection. If the true cause is not visibly evident, i.e., unstable internet connection, then the disruption cause is often well hidden. To tackle this issue we can implement an automatic socket reconnection strategy. Let’s see what options we have in the industry-standard socket library for Dart — web_socket_channel.
The Classic Approach
The example from the library is pretty much straightforward:
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
main() async {
final wsUrl = Uri.parse('ws://localhost:1234')
var channel = WebSocketChannel.connect(wsUrl);
channel.stream.listen((message) {
channel.sink.add('received!');
channel.sink.close(status.goingAway);
});
}
Unfortunately, WebSocketChannel
doesn’t offer built-in configuration options for handling reconnection. Hence, we need to manually react to the stream errors. Let’s imitate the sudden error from the WebSocket. Here’s how you can catch errors in the stream listener:
channel.stream.listen(
(message) {
channel.sink.add('received!');
},
onError: (error) {
// Handle error here
},
onDone: () {
// Handle socket disruption
},
);
The typical solution would be to call WebSocketChannel.connect
again and override the stream in the callback.
onDone: () {
channel = WebSocketChannel.connect(Uri.parse(url));
stream = channel.stream.listen(
...
);
},
While this approach works, it can become cumbersome in a production application with a well-structured architecture.
The Clean Architecture Solution
A typical app architecture consists of different layers, classes, and zones of responsibility. Let’s look at the clean architecture example:
Sockets are often initialized in the data layer and used in the presentation layer. Ideally, the presentation layer shouldn’t know about the socket's inner work including whether the socket is trying to reconnect.
However, the previous approach forces us to manage reconnection logic in the presentation layer. So, instead of overwhelming the UI layer with data responsibilities, we can consolidate the reconnection logic to the data layer, and even better, within a single class.
The Two-Stream Strategy
The idea is to utilize two streams: an inner stream that maintains a connection to the socket and an outer stream that serves as the entry point for other classes, all while preserving the connection. The inner stream is responsible for handling errors and reconnections, while the outer stream remains untouched, waiting for data from the inner stream.
Implementation: The SocketChannel Class
Let’s dive into the implementation of the SocketChannel
class, which will handle our reconnection logic. We’ll start by passing the socket configuration to this class:
SocketChannel getChannel() {
return SocketChannel(
() => IOWebSocketChannel.connect(
'ws://localhost:1234',
),
);
}
The SocketChannel
class will handle subscriptions, reconnection, message sending, and data streaming. We also need a sink to pass messages and IOWebSocketChannel
itself, which will be extracted from the constructor parameter.
As discussed before we are going to implement an inner stream and an outer stream, and the latter will be presented by BehaviorSubject
from the rxdart library. Thus, every time someone connects to our socket class, they will get the latest data from the socket.
class SocketChannel {
SocketChannel(this._getIOWebSocketChannel) {
_startConnection();
}
final IOWebSocketChannel Function() _getIOWebSocketChannel;
late IOWebSocketChannel _ioWebSocketChannel;
WebSocketSink get _sink => _ioWebSocketChannel.sink;
late Stream<dynamic> _innerStream;
final _outerStreamSubject = BehaviorSubject<dynamic>();
Stream<dynamic> get stream => _outerStreamSubject.stream;
}
Now, let’s add the _startConnection()
method to initiate the socket connection from the constructor:
void _startConnection() {
_ioWebSocketChannel = _getIOWebSocketChannel();
_innerStream = _ioWebSocketChannel.stream;
_innerStream.listen(
(event) {
// Forward data to outer stream
_outerStreamSubject.add(event);
},
onError: (error) {
// Handle web socket connection error
_handleLostConnection();
},
onDone: () {
// Handle web socket connection break
_handleLostConnection();
},
);
}
void _handleLostConnection() {
_startConnection();
}
Improved Reconnection Logic
To enhance our solution, let’s address the scenario where a socket fails to reconnect immediately. For instance, if the internet connection is lost for a few minutes, we can implement a ping mechanism to check the server’s status periodically. The first reconnection attempt should occur immediately after the initial connection break, with subsequent attempts being delayed.
bool _isFirstRestart = false;
bool _isFollowingRestart = false;
void _handleLostConnection() {
if (_isFirstRestart && !_isFollowingRestart) {
Future.delayed(const Duration(seconds: 3), () {
_isFollowingRestart = false;
_startConnection();
});
_isFollowingRestart = true;
} else {
_isFirstRestart = true;
_startConnection();
}
}
Finally, we can add a close()
method to close the socket. Closing the sink will trigger the onDone
callback, so we need to set the flag _isManuallyClose = true
inside the method and check it in the callback.
bool _isManuallyClosed = false;
void _startConnection() {
...
onDone: () {
if (!_isManuallyClosed) {
_handleLostConnection();
}
},
...
}
void close() {
_isManuallyClosed = true;
_sink.close();
}
Final result:
import 'package:rxdart/rxdart.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class SocketChannel {
SocketChannel(this._getIOWebSocketChannel) {
_startConnection();
}
final IOWebSocketChannel Function() _getIOWebSocketChannel;
late IOWebSocketChannel _ioWebSocketChannel;
WebSocketSink get _sink => _ioWebSocketChannel.sink;
late Stream<dynamic> _innerStream;
final _outerStreamSubject = BehaviorSubject<dynamic>();
Stream<dynamic> get stream => _outerStreamSubject.stream;
bool _isFirstRestart = false;
bool _isFollowingRestart = false;
bool _isManuallyClosed = false;
void _handleLostConnection() {
if (_isFirstRestart && !_isFollowingRestart) {
Future.delayed(const Duration(seconds: 3), () {
_isFollowingRestart = false;
_startConnection();
});
_isFollowingRestart = true;
} else {
_isFirstRestart = true;
_startConnection();
}
}
void _startConnection() {
_ioWebSocketChannel = _getIOWebSocketChannel();
_innerStream = _ioWebSocketChannel.stream;
_innerStream.listen(
(event) {
_isFirstRestart = false;
_outerStreamSubject.add(event);
},
onError: (error) {
_handleLostConnection();
},
onDone: () {
if (!_isManuallyClosed) {
_handleLostConnection();
}
},
);
}
void sendMessage(String message) => _sink.add(message);
void close() {
_isManuallyClosed = true;
_sink.close();
}
}
Conclusion
In this article, we explored socket reconnection in Flutter applications and implemented a clean and efficient solution using the SocketChannel
class. By encapsulating reconnection logic within the data layer, we can keep our presentation layer clean. With the added feature of delayed reconnections, we've built a foundation for maintaining continuous socket connections.