#103 migrated the logs websocket to rx-stomp

This commit is contained in:
Fabio Formosa
2022-12-18 11:58:24 +01:00
parent a3b92443c4
commit ac63576704
8 changed files with 129 additions and 73 deletions

View File

@@ -27,7 +27,7 @@
"@fortawesome/fontawesome": "^1.1.4",
"@fortawesome/fontawesome-free-regular": "^5.0.8",
"@fortawesome/fontawesome-free-solid": "^5.0.8",
"@stomp/ng2-stompjs": "^0.6.3",
"@stomp/rx-stomp": "1.2.0",
"core-js": "2.5.1",
"hammerjs": "2.0.8",
"moment": "^2.29.1",
@@ -4229,19 +4229,19 @@
"dev": true,
"license": "MIT"
},
"node_modules/@stomp/ng2-stompjs": {
"version": "0.6.4",
"license": "MIT",
"node_modules/@stomp/rx-stomp": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@stomp/rx-stomp/-/rx-stomp-1.2.0.tgz",
"integrity": "sha512-QLzPe3q0EwLB+cVWdUFEO4z5tyR+kPnXJANKN2UvB7Spz/oViHF959cydmXdQWaK7NHp86VO54TgFfXbHVnSLg==",
"dependencies": {
"@stomp/stompjs": "^4.0.0 >=4.0.2"
"@stomp/stompjs": "^6.0.0 >=6.1.1",
"angular2-uuid": "^1.1.1"
}
},
"node_modules/@stomp/stompjs": {
"version": "4.0.8",
"license": "Apache-2.0",
"optionalDependencies": {
"websocket": "^1.0.24"
}
"node_modules/@stomp/rx-stomp/node_modules/@stomp/stompjs": {
"version": "6.1.2",
"resolved": "https://registry.npmjs.org/@stomp/stompjs/-/stompjs-6.1.2.tgz",
"integrity": "sha512-FHDTrIFM5Ospi4L3Xhj6v2+NzCVAeNDcBe95YjUWhWiRMrBF6uN3I7AUOlRgT6jU/2WQvvYK8ZaIxFfxFp+uHQ=="
},
"node_modules/@tootallnate/once": {
"version": "2.0.0",
@@ -5419,6 +5419,11 @@
"ajv": "^8.8.2"
}
},
"node_modules/angular2-uuid": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/angular2-uuid/-/angular2-uuid-1.1.1.tgz",
"integrity": "sha512-6AXPyii9q8KBFGagybLNVmdGJLPcVZAhmv3odNGSJIA18LuJ3xOe6uN9GvjlQsGfdmYeuxlsGnFEUu7gPhkc+g=="
},
"node_modules/ansi-colors": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz",
@@ -22763,16 +22768,20 @@
"version": "3.1.0",
"dev": true
},
"@stomp/ng2-stompjs": {
"version": "0.6.4",
"@stomp/rx-stomp": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@stomp/rx-stomp/-/rx-stomp-1.2.0.tgz",
"integrity": "sha512-QLzPe3q0EwLB+cVWdUFEO4z5tyR+kPnXJANKN2UvB7Spz/oViHF959cydmXdQWaK7NHp86VO54TgFfXbHVnSLg==",
"requires": {
"@stomp/stompjs": "^4.0.0 >=4.0.2"
}
},
"@stomp/stompjs": {
"version": "4.0.8",
"requires": {
"websocket": "^1.0.24"
"@stomp/stompjs": "^6.0.0 >=6.1.1",
"angular2-uuid": "^1.1.1"
},
"dependencies": {
"@stomp/stompjs": {
"version": "6.1.2",
"resolved": "https://registry.npmjs.org/@stomp/stompjs/-/stompjs-6.1.2.tgz",
"integrity": "sha512-FHDTrIFM5Ospi4L3Xhj6v2+NzCVAeNDcBe95YjUWhWiRMrBF6uN3I7AUOlRgT6jU/2WQvvYK8ZaIxFfxFp+uHQ=="
}
}
},
"@tootallnate/once": {
@@ -23670,6 +23679,11 @@
"fast-deep-equal": "^3.1.3"
}
},
"angular2-uuid": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/angular2-uuid/-/angular2-uuid-1.1.1.tgz",
"integrity": "sha512-6AXPyii9q8KBFGagybLNVmdGJLPcVZAhmv3odNGSJIA18LuJ3xOe6uN9GvjlQsGfdmYeuxlsGnFEUu7gPhkc+g=="
},
"ansi-colors": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz",

View File

@@ -30,7 +30,7 @@
"@fortawesome/fontawesome": "^1.1.4",
"@fortawesome/fontawesome-free-regular": "^5.0.8",
"@fortawesome/fontawesome-free-solid": "^5.0.8",
"@stomp/ng2-stompjs": "^0.6.3",
"@stomp/rx-stomp": "1.2.0",
"core-js": "2.5.1",
"hammerjs": "2.0.8",
"moment": "^2.29.1",

View File

@@ -62,36 +62,12 @@ import { APP_BASE_HREF } from '@angular/common';
import {SimpleTriggerConfigComponent} from './components/simple-trigger-config';
import JobService from './services/job.service';
import {GenericErrorComponent} from './views/error/genericError.component';
import {LogsRxWebsocketService} from './services/logs.rx-websocket.service';
export function initUserFactory(userService: UserService) {
return () => userService.fetchLoggedUser();
}
// const stompConfig: StompConfig = {
// // Which server?
// url: 'ws://localhost:8080/quartz-manager/progress',
// // Headers
// // Typical keys: login, passcode, host
// headers: {
// login: 'admin',
// passcode: 'admin'
// },
// // How often to heartbeat?
// // Interval in milliseconds, set to 0 to disable
// heartbeat_in: 0, // Typical value 0 - disabled
// heartbeat_out: 20000, // Typical value 20000 - every 20 seconds
// // Wait in milliseconds before attempting auto reconnect
// // Set to 0 to disable
// // Typical value 5000 (5 seconds)
// reconnect_delay: 5000,
// // Will log diagnostics on console
// debug: true
// };
export function jwtOptionsFactory(apiService: ApiService) {
return {
tokenGetter: () => {
@@ -169,18 +145,13 @@ export function jwtOptionsFactory(apiService: ApiService) {
JobService,
TriggerService,
ProgressWebsocketService,
LogsWebsocketService,
// LogsWebsocketService,
LogsRxWebsocketService,
AuthService,
ApiService,
UserService,
ConfigService,
MatIconRegistry
// StompService,
// ServerSocket
// {
// provide: StompConfig,
// useValue: stompConfig
// }
],
bootstrap: [AppComponent]
})

View File

@@ -1,42 +1,62 @@
import {Component, OnInit, Input, Output, EventEmitter} from '@angular/core';
import {Component, OnInit, Input, Output, EventEmitter, OnDestroy} from '@angular/core';
import {LogsWebsocketService, ApiService} from '../../services';
import {LogsWebsocketService, ApiService, getBaseUrl, CONTEXT_PATH, QuartzManagerWebsocketMessage} from '../../services';
import {Observable} from 'rxjs';
import {RxStompService, } from '../../services/rx-stomp.service';
import {RxStompConfig} from '@stomp/rx-stomp/esm6/rx-stomp-config';
import {LogsRxWebsocketService} from '../../services/logs.rx-websocket.service';
import {map} from 'rxjs/operators';
@Component({
selector: 'logs-panel',
templateUrl: './logs-panel.component.html',
styleUrls: ['./logs-panel.component.scss']
})
export class LogsPanelComponent implements OnInit {
export class LogsPanelComponent implements OnInit, OnDestroy {
MAX_LOGS = 30;
logs = new Array();
topicSubscription;
constructor(
private logsWebsocketService: LogsWebsocketService,
// private logsWebsocketService: LogsWebsocketService,
private logsRxWebsocketService: LogsRxWebsocketService,
private apiService: ApiService
) {
}
ngOnInit() {
const obs = this.logsWebsocketService.getObservable()
obs.subscribe({
'next': this.onNewLogMsg,
'error': (err) => {
console.log(err)
}
});
// const obs = this.logsWebsocketService.getObservable()
// obs.subscribe({
// 'next': this.onNewLogMsg,
// 'error': (err) => {
// console.log(err)
// }
// });
this.topicSubscription = this.logsRxWebsocketService.watch('/topic/logs')
.pipe(map(msg => JSON.parse(msg.body)))
.subscribe(this._showNewLog, (err) => {
console.log(err);
// TODO in case of 401
// this.apiService.get('/quartz-manager/session/refresh');
});
}
onNewLogMsg = (receivedMsg) => {
if (receivedMsg.type === 'SUCCESS') {
this._showNewLog(receivedMsg.message);
} else if (receivedMsg.type === 'ERROR') {
this._refreshSession();
} // if websocket has been closed for session expiration, try to refresh it
};
ngOnDestroy() {
this.topicSubscription.unsubscribe();
}
// onNewLogMsg = (receivedMsg) => {
// if (receivedMsg.body.type === 'SUCCESS') {
// this._showNewLog(receivedMsg.body.message);
// } else if (receivedMsg.body.type === 'ERROR') {
// this._refreshSession();
// } // if websocket has been closed for session expiration, try to refresh it
// };
_showNewLog = (logRecord) => {
if (this.logs.length > this.MAX_LOGS) {

View File

@@ -0,0 +1,16 @@
import { TestBed } from '@angular/core/testing';
import { LogsRxWebsocketService } from './logs.rx-websocket.service';
describe('LogsRxWebsocketService', () => {
let service: LogsRxWebsocketService;
beforeEach(() => {
TestBed.configureTestingModule({});
service = TestBed.inject(LogsRxWebsocketService);
});
it('should be created', () => {
expect(service).toBeTruthy();
});
});

View File

@@ -0,0 +1,23 @@
import { Injectable } from '@angular/core';
import {RxStompService} from './rx-stomp.service';
import {ApiService} from './api.service';
import SockJS from 'sockjs-client';
import {CONTEXT_PATH, getBaseUrl} from './config.service';
@Injectable({
providedIn: 'root'
})
export class LogsRxWebsocketService extends RxStompService {
constructor(private apiService: ApiService) {
super({
webSocketFactory: () => new SockJS(`${getBaseUrl()}${CONTEXT_PATH}/logs?access_token=${this.apiService.getToken()}`),
heartbeatIncoming: 0,
heartbeatOutgoing: 20000,
reconnectDelay: 200,
debug: (msg: string): void => {
console.log(new Date(), msg);
}
});
}
}

View File

@@ -0,0 +1,12 @@
import {RxStomp} from '@stomp/rx-stomp';
import {RxStompConfig} from '@stomp/rx-stomp/esm6/rx-stomp-config';
export class RxStompService extends RxStomp {
constructor(rxStompConfig: RxStompConfig) {
super();
super.configure(rxStompConfig);
super.activate();
}
}

View File

@@ -77,7 +77,7 @@ export class WebsocketService {
};
_socketListener = (frame) => {
console.log('Connected: ' + frame);
console.log(`Connected to ${this._options.socketUrl}: ${frame}`);
this._socket.stomp.subscribe(
this._options.topicName,
data => this.subscribers.forEach(subscriber => subscriber.observer.next(this.getMessage(data)))
@@ -94,7 +94,7 @@ export class WebsocketService {
scheduleReconnection = () => {
this.reconnectionPromise = setTimeout(() => {
console.log('Socket reconnecting... (if it fails, next attempt in ' + this._options.reconnectionTimeout + ' msec)');
console.log(`Socket reconnecting to ${this._options.socketUrl}... (if it fails, next attempt in ${this._options.reconnectionTimeout} msec)`);
this.connect();
}, this._options.reconnectionTimeout);
}