#103 appended the trigger key to the websocket topic

This commit is contained in:
Fabio Formosa
2022-12-20 20:51:10 +01:00
parent ac63576704
commit 261dd8b624
9 changed files with 53 additions and 15 deletions

View File

@@ -2,10 +2,12 @@ import {Component, OnInit, Input, Output, EventEmitter, OnDestroy} from '@angula
import {LogsWebsocketService, ApiService, getBaseUrl, CONTEXT_PATH, QuartzManagerWebsocketMessage} from '../../services'; import {LogsWebsocketService, ApiService, getBaseUrl, CONTEXT_PATH, QuartzManagerWebsocketMessage} from '../../services';
import {Observable} from 'rxjs'; import {Observable} from 'rxjs';
import {RxStompService, } from '../../services/rx-stomp.service'; import {RxStompService,} from '../../services/rx-stomp.service';
import {RxStompConfig} from '@stomp/rx-stomp/esm6/rx-stomp-config'; import {RxStompConfig} from '@stomp/rx-stomp/esm6/rx-stomp-config';
import {LogsRxWebsocketService} from '../../services/logs.rx-websocket.service'; import {LogsRxWebsocketService} from '../../services/logs.rx-websocket.service';
import {map} from 'rxjs/operators'; import {map} from 'rxjs/operators';
import {Trigger} from '../../model/trigger.model';
import {TriggerKey} from '../../model/triggerKey.model';
@Component({ @Component({
@@ -21,6 +23,8 @@ export class LogsPanelComponent implements OnInit, OnDestroy {
topicSubscription; topicSubscription;
private selectedTriggerKey: TriggerKey;
constructor( constructor(
// private logsWebsocketService: LogsWebsocketService, // private logsWebsocketService: LogsWebsocketService,
private logsRxWebsocketService: LogsRxWebsocketService, private logsRxWebsocketService: LogsRxWebsocketService,
@@ -28,6 +32,14 @@ export class LogsPanelComponent implements OnInit, OnDestroy {
) { ) {
} }
@Input()
set triggerKey(triggerKey: TriggerKey) {
this.selectedTriggerKey = {...triggerKey} as TriggerKey;
if (this.selectedTriggerKey && this.selectedTriggerKey.name) {
this._subscribeToTheTopic(this.selectedTriggerKey);
}
}
ngOnInit() { ngOnInit() {
// const obs = this.logsWebsocketService.getObservable() // const obs = this.logsWebsocketService.getObservable()
// obs.subscribe({ // obs.subscribe({
@@ -37,17 +49,34 @@ export class LogsPanelComponent implements OnInit, OnDestroy {
// } // }
// }); // });
this.topicSubscription = this.logsRxWebsocketService.watch('/topic/logs') // this.topicSubscription = this.logsRxWebsocketService.watch('/topic/logs')
// .pipe(map(msg => JSON.parse(msg.body)))
// .subscribe(this._showNewLog, (err) => {
// console.log(err);
// // TODO in case of 401
// // this.apiService.get('/quartz-manager/session/refresh');
// });
}
private _subscribeToTheTopic = (triggerKey: TriggerKey) => {
if (this.topicSubscription) {
this.topicSubscription.unsubscribe();
}
this.topicSubscription = this.logsRxWebsocketService.watch(`/topic/logs/${triggerKey.name}`)
.pipe(map(msg => JSON.parse(msg.body))) .pipe(map(msg => JSON.parse(msg.body)))
.subscribe(this._showNewLog, (err) => { .subscribe(this._showNewLog, (err) => {
console.log(err); console.log(err);
// TODO in case of 401 // TODO in case of 401
// this.apiService.get('/quartz-manager/session/refresh'); // this.apiService.get('/quartz-manager/session/refresh');
}); });
} };
ngOnDestroy() { ngOnDestroy() {
if (this.topicSubscription) {
this.topicSubscription.unsubscribe();
}
this.topicSubscription.unsubscribe(); this.topicSubscription.unsubscribe();
this.topicSubscription = null;
} }
// onNewLogMsg = (receivedMsg) => { // onNewLogMsg = (receivedMsg) => {

View File

@@ -73,8 +73,10 @@ export class SimpleTriggerConfigComponent implements OnInit {
@Input() @Input()
set triggerKey(triggerKey: TriggerKey) { set triggerKey(triggerKey: TriggerKey) {
this.selectedTriggerKey = {...triggerKey} as TriggerKey; if (!this.selectedTriggerKey || this.selectedTriggerKey.name !== triggerKey.name){
this.fetchSelectedTrigger(); this.selectedTriggerKey = {...triggerKey} as TriggerKey;
this.fetchSelectedTrigger();
}
} }

View File

@@ -90,7 +90,7 @@ export class TriggerListComponent implements OnInit {
onNewTrigger(newTrigger: SimpleTrigger) { onNewTrigger(newTrigger: SimpleTrigger) {
this.newTriggers = [newTrigger, ...this.newTriggers]; this.newTriggers = [newTrigger, ...this.newTriggers];
this.selectedTrigger = newTrigger.triggerKeyDTO; this.selectTrigger(newTrigger.triggerKeyDTO);
} }
} }

View File

@@ -30,7 +30,10 @@
<div fxFlex="1 1 auto" style="margin-left: 20px;"> <div fxFlex="1 1 auto" style="margin-left: 20px;">
<div fxFlex="1 1 auto" fxLayout="column" fxLayoutAlign="start stretch" fxLayoutGap="6px"> <div fxFlex="1 1 auto" fxLayout="column" fxLayoutAlign="start stretch" fxLayoutGap="6px">
<progress-panel></progress-panel> <progress-panel></progress-panel>
<logs-panel fxFlex="1 1 auto" fxFill></logs-panel> <logs-panel fxFlex="1 1 auto" fxFill
[triggerKey]=selectedTriggerKey
>
</logs-panel>
</div> </div>
</div> </div>

View File

@@ -38,11 +38,13 @@ public abstract class AbstractQuartzManagerJob implements Job {
LogRecord logMsg = doIt(jobExecutionContext); LogRecord logMsg = doIt(jobExecutionContext);
log.info(logMsg.getMessage()); log.info(logMsg.getMessage());
String triggerName = jobExecutionContext.getTrigger().getKey().getName();
logMsg.setThreadName(Thread.currentThread().getName()); logMsg.setThreadName(Thread.currentThread().getName());
webSocketLogsNotifier.send(logMsg); webSocketLogsNotifier.send(triggerName, logMsg);
TriggerFiredBundleDTO triggerFiredBundleDTO = WebSocketProgressNotifier.buildTriggerFiredBundle(jobExecutionContext); TriggerFiredBundleDTO triggerFiredBundleDTO = WebSocketProgressNotifier.buildTriggerFiredBundle(jobExecutionContext);
webSocketProgressNotifier.send(triggerFiredBundleDTO); webSocketProgressNotifier.send(triggerName, triggerFiredBundleDTO);
} }
} }

View File

@@ -14,7 +14,7 @@ public class WebSocketLogsNotifier implements WebhookSender<LogRecord> {
private SimpMessageSendingOperations messagingTemplate; private SimpMessageSendingOperations messagingTemplate;
@Override @Override
public void send(LogRecord logRecord) { public void send(String triggerName, LogRecord logRecord) {
messagingTemplate.convertAndSend(TOPIC_LOGS, logRecord); messagingTemplate.convertAndSend(TOPIC_LOGS + "/" + triggerName, logRecord);
} }
} }

View File

@@ -20,7 +20,7 @@ public class WebSocketProgressNotifier implements WebhookSender<TriggerFiredBund
private SimpMessageSendingOperations messagingTemplate; private SimpMessageSendingOperations messagingTemplate;
@Override @Override
public void send(TriggerFiredBundleDTO triggerFiredBundleDTO) { public void send(String triggerName, TriggerFiredBundleDTO triggerFiredBundleDTO) {
messagingTemplate.convertAndSend(TOPIC_PROGRESS, triggerFiredBundleDTO); messagingTemplate.convertAndSend(TOPIC_PROGRESS, triggerFiredBundleDTO);
} }

View File

@@ -9,6 +9,6 @@ package it.fabioformosa.quartzmanager.api.websockets;
*/ */
public interface WebhookSender<T> { public interface WebhookSender<T> {
void send(T message); void send(String triggerName, T message);
} }

View File

@@ -32,6 +32,7 @@ class SampleJobTest {
@Test @Test
void givenASampleJob_whenTheJobIsExecuted_thenTheWebhookSendersAreCalled() { void givenASampleJob_whenTheJobIsExecuted_thenTheWebhookSendersAreCalled() {
JobExecutionContext jobExecutionContext = Mockito.mock(JobExecutionContext.class); JobExecutionContext jobExecutionContext = Mockito.mock(JobExecutionContext.class);
String triggerName = "test-trigger";
ScheduleBuilder schedulerBuilder = SimpleScheduleBuilder.simpleSchedule() ScheduleBuilder schedulerBuilder = SimpleScheduleBuilder.simpleSchedule()
.withRepeatCount(5) .withRepeatCount(5)
@@ -40,6 +41,7 @@ class SampleJobTest {
.newJob(SampleJob.class).withIdentity(JobKey.jobKey("test-job")) .newJob(SampleJob.class).withIdentity(JobKey.jobKey("test-job"))
.build(); .build();
Trigger trigger = TriggerBuilder.newTrigger() Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerName)
.forJob(jobDetail) .forJob(jobDetail)
.withSchedule(schedulerBuilder) .withSchedule(schedulerBuilder)
.build(); .build();
@@ -47,14 +49,14 @@ class SampleJobTest {
Mockito.when(jobExecutionContext.getJobDetail()).thenReturn(jobDetail); Mockito.when(jobExecutionContext.getJobDetail()).thenReturn(jobDetail);
sampleJob.execute(jobExecutionContext); sampleJob.execute(jobExecutionContext);
Mockito.verify(webSocketLogsNotifier).send(argThat(actualLogRecord -> { Mockito.verify(webSocketLogsNotifier).send(triggerName, argThat(actualLogRecord -> {
Assertions.assertThat(actualLogRecord.getMessage()).isEqualTo("Hello!"); Assertions.assertThat(actualLogRecord.getMessage()).isEqualTo("Hello!");
Assertions.assertThat(actualLogRecord.getType()).isEqualTo(LogRecord.LogType.INFO); Assertions.assertThat(actualLogRecord.getType()).isEqualTo(LogRecord.LogType.INFO);
Assertions.assertThat(actualLogRecord.getDate()).isNotNull(); Assertions.assertThat(actualLogRecord.getDate()).isNotNull();
Assertions.assertThat(actualLogRecord.getThreadName()).isNotNull(); Assertions.assertThat(actualLogRecord.getThreadName()).isNotNull();
return true; return true;
})); }));
Mockito.verify(webSocketProgressNotifier).send(argThat(triggerFiredBundleDTO -> { Mockito.verify(webSocketProgressNotifier).send(triggerName, argThat(triggerFiredBundleDTO -> {
Assertions.assertThat(triggerFiredBundleDTO.getJobKey()).isEqualTo("test-job"); Assertions.assertThat(triggerFiredBundleDTO.getJobKey()).isEqualTo("test-job");
Assertions.assertThat(triggerFiredBundleDTO.getRepeatCount()).isEqualTo(6); Assertions.assertThat(triggerFiredBundleDTO.getRepeatCount()).isEqualTo(6);
Assertions.assertThat(triggerFiredBundleDTO.getJobClass()).isEqualTo(SampleJob.class.getName()); Assertions.assertThat(triggerFiredBundleDTO.getJobClass()).isEqualTo(SampleJob.class.getName());