mirror of
https://github.com/fabioformosa/quartz-manager.git
synced 2026-01-01 15:13:16 +09:00
#8 clean up
This commit is contained in:
@@ -2,8 +2,7 @@ import { Injectable, OnInit } from '@angular/core';
|
||||
import { WebsocketService } from '.';
|
||||
import { SocketOption } from '../model/SocketOption.model';
|
||||
|
||||
Injectable()
|
||||
@Injectable()
|
||||
@Injectable()
|
||||
export class LogsWebsocketService extends WebsocketService {
|
||||
|
||||
constructor(){
|
||||
|
||||
@@ -2,8 +2,7 @@ import { Injectable, OnInit } from '@angular/core';
|
||||
import { WebsocketService } from '.';
|
||||
import { SocketOption } from '../model/SocketOption.model';
|
||||
|
||||
Injectable()
|
||||
@Injectable()
|
||||
@Injectable()
|
||||
export class ProgressWebsocketService extends WebsocketService {
|
||||
|
||||
constructor(){
|
||||
|
||||
@@ -1,9 +1,4 @@
|
||||
import { Injectable, OnInit } from '@angular/core';
|
||||
|
||||
import { Observable } from 'rxjs';
|
||||
|
||||
import { ApiService } from './api.service';
|
||||
|
||||
import { SocketEndpoint } from '../model/SocketEndpoint.model'
|
||||
|
||||
|
||||
@@ -38,10 +33,7 @@ export class WebsocketService {
|
||||
this.observableStompConnection = new Observable((observer) => {
|
||||
const subscriberIndex = this.subscriberIndex++;
|
||||
this.addToSubscribers({ index: subscriberIndex, observer });
|
||||
return () => {
|
||||
const index = subscriberIndex;
|
||||
this.removeFromSubscribers(index);
|
||||
};
|
||||
return () => this.removeFromSubscribers(subscriberIndex);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -50,13 +42,9 @@ export class WebsocketService {
|
||||
}
|
||||
|
||||
removeFromSubscribers = (index) => {
|
||||
let subscribeFromIndex;
|
||||
for (let i=0 ; i < this.subscribers.length; i++)
|
||||
if(i === index){
|
||||
subscribeFromIndex = this.subscribers[i];
|
||||
this.subscribers.splice(i, 1);
|
||||
break;
|
||||
}
|
||||
if(index > this.subscribers.length)
|
||||
throw new Error(`Unexpected error removing subscriber from websocket, because index ${index} is greater than subscriber length ${this.subscribers.length}`);
|
||||
this.subscribers.splice(index, 1);
|
||||
}
|
||||
|
||||
getObservable = () => {
|
||||
@@ -70,7 +58,7 @@ export class WebsocketService {
|
||||
out.headers = {};
|
||||
out.headers.messageId = data.headers["message-id"];
|
||||
|
||||
let messageIdIndex = this._messageIds.indexOf( out.headers.messageId);
|
||||
let messageIdIndex = this._messageIds.indexOf(out.headers.messageId);
|
||||
if ( messageIdIndex > -1) {
|
||||
out.self = true;
|
||||
this._messageIds = this._messageIds.splice(messageIdIndex, 1);
|
||||
@@ -80,24 +68,20 @@ export class WebsocketService {
|
||||
|
||||
_socketListener = (frame) => {
|
||||
console.log('Connected: ' + frame);
|
||||
this._socket.stomp.subscribe(this._options.topicName, (data) => {
|
||||
this.subscribers.forEach(subscriber => {
|
||||
subscriber.observer.next(this.getMessage(data));
|
||||
})
|
||||
})
|
||||
this._socket.stomp.subscribe(
|
||||
this._options.topicName,
|
||||
data => this.subscribers.forEach(subscriber => subscriber.observer.next(this.getMessage(data)))
|
||||
);
|
||||
}
|
||||
|
||||
_onSocketError = (errorMsg) => {
|
||||
let out: any = {};
|
||||
out.type = 'ERROR';
|
||||
out.message = errorMsg;
|
||||
this.subscribers.forEach(subscriber => {
|
||||
subscriber.observer.error(out);
|
||||
})
|
||||
this.subscribers.forEach(subscriber => subscriber.observer.error(out));
|
||||
this.scheduleReconnection();
|
||||
}
|
||||
|
||||
|
||||
scheduleReconnection = () => {
|
||||
this.reconnectionPromise = setTimeout(() => {
|
||||
console.log("Socket reconnecting... (if it fails, next attempt in " + this._options.reconnectionTimeout + " msec)");
|
||||
|
||||
Reference in New Issue
Block a user