2022-06-08 10:36:39 +00:00
import { _ _assign , _ _extends } from "tslib" ;
2022-01-21 08:28:41 +00:00
import { Subject , AnonymousSubject } from '../../Subject' ;
import { Subscriber } from '../../Subscriber' ;
import { Observable } from '../../Observable' ;
import { Subscription } from '../../Subscription' ;
import { ReplaySubject } from '../../ReplaySubject' ;
var DEFAULT _WEBSOCKET _CONFIG = {
url : '' ,
deserializer : function ( e ) { return JSON . parse ( e . data ) ; } ,
serializer : function ( value ) { return JSON . stringify ( value ) ; } ,
} ;
var WEBSOCKETSUBJECT _INVALID _ERROR _OBJECT = 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }' ;
2022-06-08 10:36:39 +00:00
var WebSocketSubject = ( function ( _super ) {
_ _extends ( WebSocketSubject , _super ) ;
2022-01-21 08:28:41 +00:00
function WebSocketSubject ( urlConfigOrSource , destination ) {
var _this = _super . call ( this ) || this ;
2022-06-08 10:36:39 +00:00
_this . _socket = null ;
2022-01-21 08:28:41 +00:00
if ( urlConfigOrSource instanceof Observable ) {
_this . destination = destination ;
_this . source = urlConfigOrSource ;
}
else {
2022-06-08 10:36:39 +00:00
var config = ( _this . _config = _ _assign ( { } , DEFAULT _WEBSOCKET _CONFIG ) ) ;
2022-01-21 08:28:41 +00:00
_this . _output = new Subject ( ) ;
if ( typeof urlConfigOrSource === 'string' ) {
config . url = urlConfigOrSource ;
}
else {
for ( var key in urlConfigOrSource ) {
if ( urlConfigOrSource . hasOwnProperty ( key ) ) {
config [ key ] = urlConfigOrSource [ key ] ;
}
}
}
if ( ! config . WebSocketCtor && WebSocket ) {
config . WebSocketCtor = WebSocket ;
}
else if ( ! config . WebSocketCtor ) {
throw new Error ( 'no WebSocket constructor can be found' ) ;
}
_this . destination = new ReplaySubject ( ) ;
}
return _this ;
}
WebSocketSubject . prototype . lift = function ( operator ) {
var sock = new WebSocketSubject ( this . _config , this . destination ) ;
sock . operator = operator ;
sock . source = this ;
return sock ;
} ;
WebSocketSubject . prototype . _resetState = function ( ) {
this . _socket = null ;
if ( ! this . source ) {
this . destination = new ReplaySubject ( ) ;
}
this . _output = new Subject ( ) ;
} ;
WebSocketSubject . prototype . multiplex = function ( subMsg , unsubMsg , messageFilter ) {
var self = this ;
return new Observable ( function ( observer ) {
try {
self . next ( subMsg ( ) ) ;
}
catch ( err ) {
observer . error ( err ) ;
}
2022-06-08 10:36:39 +00:00
var subscription = self . subscribe ( {
next : function ( x ) {
try {
if ( messageFilter ( x ) ) {
observer . next ( x ) ;
}
2022-04-07 07:06:43 +00:00
}
2022-06-08 10:36:39 +00:00
catch ( err ) {
observer . error ( err ) ;
}
} ,
error : function ( err ) { return observer . error ( err ) ; } ,
complete : function ( ) { return observer . complete ( ) ; } ,
} ) ;
2022-01-21 08:28:41 +00:00
return function ( ) {
try {
self . next ( unsubMsg ( ) ) ;
}
catch ( err ) {
observer . error ( err ) ;
}
subscription . unsubscribe ( ) ;
} ;
} ) ;
} ;
WebSocketSubject . prototype . _connectSocket = function ( ) {
var _this = this ;
var _a = this . _config , WebSocketCtor = _a . WebSocketCtor , protocol = _a . protocol , url = _a . url , binaryType = _a . binaryType ;
var observer = this . _output ;
var socket = null ;
try {
2022-06-08 10:36:39 +00:00
socket = protocol ? new WebSocketCtor ( url , protocol ) : new WebSocketCtor ( url ) ;
2022-01-21 08:28:41 +00:00
this . _socket = socket ;
if ( binaryType ) {
this . _socket . binaryType = binaryType ;
}
}
catch ( e ) {
observer . error ( e ) ;
return ;
}
var subscription = new Subscription ( function ( ) {
_this . _socket = null ;
if ( socket && socket . readyState === 1 ) {
socket . close ( ) ;
}
} ) ;
2022-06-08 10:36:39 +00:00
socket . onopen = function ( evt ) {
2022-01-21 08:28:41 +00:00
var _socket = _this . _socket ;
if ( ! _socket ) {
socket . close ( ) ;
_this . _resetState ( ) ;
return ;
}
var openObserver = _this . _config . openObserver ;
if ( openObserver ) {
2022-06-08 10:36:39 +00:00
openObserver . next ( evt ) ;
2022-01-21 08:28:41 +00:00
}
var queue = _this . destination ;
_this . destination = Subscriber . create ( function ( x ) {
if ( socket . readyState === 1 ) {
try {
var serializer = _this . _config . serializer ;
socket . send ( serializer ( x ) ) ;
}
catch ( e ) {
_this . destination . error ( e ) ;
}
}
2022-06-08 10:36:39 +00:00
} , function ( err ) {
2022-01-21 08:28:41 +00:00
var closingObserver = _this . _config . closingObserver ;
if ( closingObserver ) {
closingObserver . next ( undefined ) ;
}
2022-06-08 10:36:39 +00:00
if ( err && err . code ) {
socket . close ( err . code , err . reason ) ;
2022-01-21 08:28:41 +00:00
}
else {
observer . error ( new TypeError ( WEBSOCKETSUBJECT _INVALID _ERROR _OBJECT ) ) ;
}
_this . _resetState ( ) ;
} , function ( ) {
var closingObserver = _this . _config . closingObserver ;
if ( closingObserver ) {
closingObserver . next ( undefined ) ;
}
socket . close ( ) ;
_this . _resetState ( ) ;
} ) ;
if ( queue && queue instanceof ReplaySubject ) {
subscription . add ( queue . subscribe ( _this . destination ) ) ;
}
} ;
socket . onerror = function ( e ) {
_this . _resetState ( ) ;
observer . error ( e ) ;
} ;
socket . onclose = function ( e ) {
2022-06-08 10:36:39 +00:00
if ( socket === _this . _socket ) {
_this . _resetState ( ) ;
}
2022-01-21 08:28:41 +00:00
var closeObserver = _this . _config . closeObserver ;
if ( closeObserver ) {
closeObserver . next ( e ) ;
}
if ( e . wasClean ) {
observer . complete ( ) ;
}
else {
observer . error ( e ) ;
}
} ;
socket . onmessage = function ( e ) {
try {
var deserializer = _this . _config . deserializer ;
observer . next ( deserializer ( e ) ) ;
}
catch ( err ) {
observer . error ( err ) ;
}
} ;
} ;
WebSocketSubject . prototype . _subscribe = function ( subscriber ) {
var _this = this ;
var source = this . source ;
if ( source ) {
return source . subscribe ( subscriber ) ;
}
if ( ! this . _socket ) {
this . _connectSocket ( ) ;
}
this . _output . subscribe ( subscriber ) ;
subscriber . add ( function ( ) {
var _socket = _this . _socket ;
if ( _this . _output . observers . length === 0 ) {
2022-06-08 10:36:39 +00:00
if ( _socket && ( _socket . readyState === 1 || _socket . readyState === 0 ) ) {
2022-01-21 08:28:41 +00:00
_socket . close ( ) ;
}
_this . _resetState ( ) ;
}
} ) ;
return subscriber ;
} ;
WebSocketSubject . prototype . unsubscribe = function ( ) {
var _socket = this . _socket ;
2022-06-08 10:36:39 +00:00
if ( _socket && ( _socket . readyState === 1 || _socket . readyState === 0 ) ) {
2022-01-21 08:28:41 +00:00
_socket . close ( ) ;
}
this . _resetState ( ) ;
_super . prototype . unsubscribe . call ( this ) ;
} ;
return WebSocketSubject ;
} ( AnonymousSubject ) ) ;
export { WebSocketSubject } ;
2022-06-08 10:36:39 +00:00
//# sourceMappingURL=WebSocketSubject.js.map