2022-01-21 08:28:41 +00:00
import { Subject , AnonymousSubject } from '../../Subject' ;
import { Subscriber } from '../../Subscriber' ;
import { Observable } from '../../Observable' ;
import { Subscription } from '../../Subscription' ;
import { ReplaySubject } from '../../ReplaySubject' ;
const DEFAULT _WEBSOCKET _CONFIG = {
url : '' ,
deserializer : ( e ) => JSON . parse ( e . data ) ,
serializer : ( value ) => JSON . stringify ( value ) ,
} ;
const WEBSOCKETSUBJECT _INVALID _ERROR _OBJECT = 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }' ;
export class WebSocketSubject extends AnonymousSubject {
constructor ( urlConfigOrSource , destination ) {
super ( ) ;
2022-06-08 10:36:39 +00:00
this . _socket = null ;
2022-01-21 08:28:41 +00:00
if ( urlConfigOrSource instanceof Observable ) {
this . destination = destination ;
this . source = urlConfigOrSource ;
}
else {
2022-06-08 10:36:39 +00:00
const config = ( this . _config = Object . assign ( { } , DEFAULT _WEBSOCKET _CONFIG ) ) ;
2022-01-21 08:28:41 +00:00
this . _output = new Subject ( ) ;
if ( typeof urlConfigOrSource === 'string' ) {
config . url = urlConfigOrSource ;
}
else {
2022-06-08 10:36:39 +00:00
for ( const key in urlConfigOrSource ) {
2022-01-21 08:28:41 +00:00
if ( urlConfigOrSource . hasOwnProperty ( key ) ) {
config [ key ] = urlConfigOrSource [ key ] ;
}
}
}
if ( ! config . WebSocketCtor && WebSocket ) {
config . WebSocketCtor = WebSocket ;
}
else if ( ! config . WebSocketCtor ) {
throw new Error ( 'no WebSocket constructor can be found' ) ;
}
this . destination = new ReplaySubject ( ) ;
}
}
lift ( operator ) {
const sock = new WebSocketSubject ( this . _config , this . destination ) ;
sock . operator = operator ;
sock . source = this ;
return sock ;
}
_resetState ( ) {
this . _socket = null ;
if ( ! this . source ) {
this . destination = new ReplaySubject ( ) ;
}
this . _output = new Subject ( ) ;
}
multiplex ( subMsg , unsubMsg , messageFilter ) {
const self = this ;
return new Observable ( ( observer ) => {
try {
self . next ( subMsg ( ) ) ;
}
catch ( err ) {
observer . error ( err ) ;
}
2022-06-08 10:36:39 +00:00
const subscription = self . subscribe ( {
next : ( x ) => {
try {
if ( messageFilter ( x ) ) {
observer . next ( x ) ;
}
2022-04-07 07:06:43 +00:00
}
2022-06-08 10:36:39 +00:00
catch ( err ) {
observer . error ( err ) ;
}
} ,
error : ( err ) => observer . error ( err ) ,
complete : ( ) => observer . complete ( ) ,
} ) ;
2022-01-21 08:28:41 +00:00
return ( ) => {
try {
self . next ( unsubMsg ( ) ) ;
}
catch ( err ) {
observer . error ( err ) ;
}
subscription . unsubscribe ( ) ;
} ;
} ) ;
}
_connectSocket ( ) {
const { WebSocketCtor , protocol , url , binaryType } = this . _config ;
const observer = this . _output ;
let socket = null ;
try {
2022-06-08 10:36:39 +00:00
socket = protocol ? new WebSocketCtor ( url , protocol ) : new WebSocketCtor ( url ) ;
2022-01-21 08:28:41 +00:00
this . _socket = socket ;
if ( binaryType ) {
this . _socket . binaryType = binaryType ;
}
}
catch ( e ) {
observer . error ( e ) ;
return ;
}
const subscription = new Subscription ( ( ) => {
this . _socket = null ;
if ( socket && socket . readyState === 1 ) {
socket . close ( ) ;
}
} ) ;
2022-06-08 10:36:39 +00:00
socket . onopen = ( evt ) => {
2022-01-21 08:28:41 +00:00
const { _socket } = this ;
if ( ! _socket ) {
socket . close ( ) ;
this . _resetState ( ) ;
return ;
}
const { openObserver } = this . _config ;
if ( openObserver ) {
2022-06-08 10:36:39 +00:00
openObserver . next ( evt ) ;
2022-01-21 08:28:41 +00:00
}
const queue = this . destination ;
this . destination = Subscriber . create ( ( x ) => {
if ( socket . readyState === 1 ) {
try {
const { serializer } = this . _config ;
socket . send ( serializer ( x ) ) ;
}
catch ( e ) {
this . destination . error ( e ) ;
}
}
2022-06-08 10:36:39 +00:00
} , ( err ) => {
2022-01-21 08:28:41 +00:00
const { closingObserver } = this . _config ;
if ( closingObserver ) {
closingObserver . next ( undefined ) ;
}
2022-06-08 10:36:39 +00:00
if ( err && err . code ) {
socket . close ( err . code , err . reason ) ;
2022-01-21 08:28:41 +00:00
}
else {
observer . error ( new TypeError ( WEBSOCKETSUBJECT _INVALID _ERROR _OBJECT ) ) ;
}
this . _resetState ( ) ;
} , ( ) => {
const { closingObserver } = this . _config ;
if ( closingObserver ) {
closingObserver . next ( undefined ) ;
}
socket . close ( ) ;
this . _resetState ( ) ;
} ) ;
if ( queue && queue instanceof ReplaySubject ) {
subscription . add ( queue . subscribe ( this . destination ) ) ;
}
} ;
socket . onerror = ( e ) => {
this . _resetState ( ) ;
observer . error ( e ) ;
} ;
socket . onclose = ( e ) => {
2022-06-08 10:36:39 +00:00
if ( socket === this . _socket ) {
this . _resetState ( ) ;
}
2022-01-21 08:28:41 +00:00
const { closeObserver } = this . _config ;
if ( closeObserver ) {
closeObserver . next ( e ) ;
}
if ( e . wasClean ) {
observer . complete ( ) ;
}
else {
observer . error ( e ) ;
}
} ;
socket . onmessage = ( e ) => {
try {
const { deserializer } = this . _config ;
observer . next ( deserializer ( e ) ) ;
}
catch ( err ) {
observer . error ( err ) ;
}
} ;
}
_subscribe ( subscriber ) {
const { source } = this ;
if ( source ) {
return source . subscribe ( subscriber ) ;
}
if ( ! this . _socket ) {
this . _connectSocket ( ) ;
}
this . _output . subscribe ( subscriber ) ;
subscriber . add ( ( ) => {
const { _socket } = this ;
if ( this . _output . observers . length === 0 ) {
2022-06-08 10:36:39 +00:00
if ( _socket && ( _socket . readyState === 1 || _socket . readyState === 0 ) ) {
2022-01-21 08:28:41 +00:00
_socket . close ( ) ;
}
this . _resetState ( ) ;
}
} ) ;
return subscriber ;
}
unsubscribe ( ) {
const { _socket } = this ;
2022-06-08 10:36:39 +00:00
if ( _socket && ( _socket . readyState === 1 || _socket . readyState === 0 ) ) {
2022-01-21 08:28:41 +00:00
_socket . close ( ) ;
}
this . _resetState ( ) ;
super . unsubscribe ( ) ;
}
}
//# sourceMappingURL=WebSocketSubject.js.map