added zeebe extension tests

This commit is contained in:
Luc Weinbrecht
2022-05-16 08:12:23 +02:00
parent 00bfe0fb81
commit ec62e6e3c9
14 changed files with 537 additions and 154 deletions

View File

@@ -72,6 +72,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-process-test-extension</artifactId>
<version>${version.zeebe}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
@@ -159,6 +166,14 @@
<build>
<finalName>${project.artifactId}</finalName>
<testResources>
<testResource>
<directory>${project.basedir}/src/test/resources</directory>
</testResource>
<testResource>
<directory>${project.basedir}/src/main/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@@ -26,7 +26,6 @@ public class SendCrossSellingRecommendation {
Long loanAgreementNumber = ((Number) job.getVariablesAsMap().get(LOAN_AGREEMENT_NUMBER)).longValue();
LoanAgreement loanAgreement = loanAgreementQuery.loadByNumber(new LoanAgreementNumber(loanAgreementNumber));
// TODO: How to get the business key?
recommendationTrigger.startLoanAgreement(new CaseId("11"), loanAgreement);
client.newCompleteCommand(job.getKey())

View File

@@ -34,7 +34,7 @@ import static org.mockito.Mockito.verify;
@Disabled
@SpringBootTest
@ZeebeSpringTest
class ProcessTest {
class ProcessTestSpring {
public static final String PROCESS_DEFINITION = "Loan_Agreement";
private static final String START_EVENT = "LoanAgreementReceivedStartEvent";

View File

@@ -0,0 +1,110 @@
package de.weinbrecht.luc.bpm.architecture.loan.agreement;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.command.DeployResourceCommandStep1;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static io.camunda.zeebe.process.test.filters.StreamFilter.jobRecords;
import static io.camunda.zeebe.protocol.record.intent.JobIntent.COMPLETED;
import static io.camunda.zeebe.protocol.record.intent.JobIntent.CREATED;
import static java.lang.String.format;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
// https://github.com/camunda/zeebe-process-test/blob/main/qa/abstracts/src/main/java/io/camunda/zeebe/process/test/qa/abstracts/util/Utilities.java
public class ProcessTestUtils {
public static DeploymentEvent deployResource(final ZeebeClient client, final String resource) {
return deployResources(client, resource);
}
public static DeploymentEvent deployResources(
final ZeebeClient client, final String... resources) {
final DeployResourceCommandStep1 commandStep1 = client.newDeployResourceCommand();
DeployResourceCommandStep1.DeployResourceCommandStep2 commandStep2 = null;
for (final String process : resources) {
if (commandStep2 == null) {
commandStep2 = commandStep1.addResourceFromClasspath(process);
} else {
commandStep2 = commandStep2.addResourceFromClasspath(process);
}
}
return commandStep2.send().join();
}
public static PublishMessageResponse sendMessage(
final ZeebeTestEngine engine,
final ZeebeClient client,
final String messageName,
final String correlationKey)
throws InterruptedException, TimeoutException {
return sendMessage(engine, client, messageName, correlationKey, emptyMap());
}
public static PublishMessageResponse sendMessage(
final ZeebeTestEngine engine,
final ZeebeClient client,
final String messageName,
final String correlationKey,
final Map<String, Object> variables)
throws InterruptedException, TimeoutException {
final PublishMessageResponse response =
client
.newPublishMessageCommand()
.messageName(messageName)
.correlationKey(correlationKey)
.variables(variables)
.send()
.join();
engine.waitForIdleState(ofSeconds(1));
return response;
}
public static ActivateJobsResponse activateSingleJob(
final ZeebeClient client, final String jobType) {
return client.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join();
}
public static void completeTaskWithType(
final ZeebeTestEngine engine, final ZeebeClient client, final String taskId, final String jobType)
throws InterruptedException, TimeoutException {
final List<Record<JobRecordValue>> records =
jobRecords(RecordStream.of(engine.getRecordStreamSource()))
.withElementId(taskId)
.withIntent(CREATED)
.stream()
.collect(toList());
jobRecords(RecordStream.of(engine.getRecordStreamSource()))
.withElementId(taskId)
.withIntent(COMPLETED)
.stream()
.forEach(record -> records.removeIf(r -> record.getKey() == r.getKey()));
if (!records.isEmpty()) {
final Record<JobRecordValue> lastRecord;
lastRecord = records.get(records.size() - 1);
assertThat(lastRecord.getValue().getType()).isEqualTo(jobType);
client.newCompleteCommand(lastRecord.getKey()).send().join();
} else {
throw new IllegalStateException(
format("Tried to complete task `%s`, but it was not found", taskId));
}
engine.waitForIdleState(ofSeconds(1));
}
}

View File

@@ -0,0 +1,71 @@
package de.weinbrecht.luc.bpm.architecture.loan.agreement;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.extension.ZeebeProcessTest;
import org.junit.jupiter.api.Test;
import static de.weinbrecht.luc.bpm.architecture.loan.agreement.ProcessTestUtils.*;
import static de.weinbrecht.luc.bpm.architecture.loan.agreement.adapter.common.ProcessConstants.*;
import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;
import static java.util.Collections.singletonMap;
@ZeebeProcessTest
class ProcessTestZeebe {
private ZeebeTestEngine engine;
private ZeebeClient client;
private static final String START_EVENT = "LoanAgreementReceivedStartEvent";
private static final String APPROVE_RULE_TASK = "ApproveAgreementRuleTask";
private static final String APPROVE_AGREEMENT_SERVICE_TASK = "ApproveLoanAgreementServiceTask";
private static final String SEND_CROSS_SELLING_EVENT = "SendCrossSellingEvent";
private static final String APPROVED_END_EVENT = "LoanAgreementApprovedEndEvent";
private static final String REJECT_AGREEMENT_SERVICE_TASK = "RejectLoanAgreementServiceTask";
private static final String NOT_APPROVED_END_EVENT = "LoanAgreementNotApprovedEndEvent";
@Test
void testRunsHappyPath() throws Exception {
deployResources(client, "loan_agreement.bpmn", "approve_agreement.dmn");
final PublishMessageResponse response = sendMessage(engine, client,
LOAN_START_EVENT_MESSAGE_REF, "", singletonMap(LOAN_AGREEMENT_NUMBER, 1L));
assertThat(response).extractingProcessInstance()
.hasPassedElementsInOrder(START_EVENT, APPROVE_RULE_TASK)
.isWaitingAtElements(APPROVE_AGREEMENT_SERVICE_TASK);
completeTaskWithType(engine , client, APPROVE_AGREEMENT_SERVICE_TASK, LOAN_AGREEMENT_TASK);
assertThat(response).extractingProcessInstance()
.hasPassedElement(START_EVENT)
.isWaitingAtElements(SEND_CROSS_SELLING_EVENT);
completeTaskWithType(engine , client, SEND_CROSS_SELLING_EVENT, SEND_CROSS_SELLING_RECOMMENDATION_TASK);
assertThat(response).extractingProcessInstance()
.hasPassedElementsInOrder(SEND_CROSS_SELLING_EVENT, APPROVED_END_EVENT)
.isCompleted();
}
@Test
void testRunsExceptionPath() throws Exception {
deployResources(client, "loan_agreement.bpmn", "approve_agreement.dmn");
final PublishMessageResponse response = sendMessage(engine, client,
LOAN_START_EVENT_MESSAGE_REF, "", singletonMap(LOAN_AGREEMENT_NUMBER, 6L));
assertThat(response).extractingProcessInstance()
.hasPassedElementsInOrder(START_EVENT, APPROVE_RULE_TASK)
.isWaitingAtElements(REJECT_AGREEMENT_SERVICE_TASK);
completeTaskWithType(engine , client, REJECT_AGREEMENT_SERVICE_TASK, LOAN_REJECTION_TASK);
assertThat(response).extractingProcessInstance()
.hasPassedElementsInOrder(NOT_APPROVED_END_EVENT)
.hasNotPassedElement(SEND_CROSS_SELLING_EVENT)
.isCompleted();
}
}

View File

@@ -1,5 +1 @@
spring.datasource.url: jdbc:h2:file:./camunda-h2-database
camunda.bpm.admin-user:
id: admin
password: pw
spring.datasource.url: jdbc:h2:file:./camunda-h2-database

View File

@@ -1,22 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="processEngineConfiguration" class="org.camunda.bpm.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
<property name="jdbcUrl" value="jdbc:h2:mem:camunda;DB_CLOSE_DELAY=1000" />
<property name="jdbcDriver" value="org.h2.Driver" />
<property name="jdbcUsername" value="sa" />
<property name="jdbcPassword" value="" />
<!-- Database configurations -->
<property name="databaseSchemaUpdate" value="true" />
<!-- job executor configurations -->
<property name="jobExecutorActivate" value="false" />
<property name="history" value="full" />
</bean>
</beans>

View File

@@ -71,6 +71,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-process-test-extension</artifactId>
<version>${version.zeebe}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
@@ -142,6 +149,14 @@
<build>
<finalName>${project.artifactId}</finalName>
<testResources>
<testResource>
<directory>${project.basedir}/src/test/resources</directory>
</testResource>
<testResource>
<directory>${project.basedir}/src/main/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@@ -2,10 +2,12 @@ package de.weinbrecht.luc.bpm.architecture.recommendation.adapter.out.db.content
import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
@RequiredArgsConstructor
@Component
@Profile("!test")
public class ExampleContentRunner implements CommandLineRunner {
private final ContentCRUDRepository contentCRUDRepository;

View File

@@ -1,103 +0,0 @@
package de.weinbrecht.luc.bpm.architecture.recommendation;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.in.RecommendationPicker;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.SendNotification;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.*;
import static io.camunda.zeebe.process.test.filters.StreamFilter.processInstance;
import static io.camunda.zeebe.protocol.record.RejectionType.NULL_VAL;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_COMPLETED;
import static io.camunda.zeebe.protocol.record.value.BpmnElementType.PROCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
// Source: https://github.com/camunda-community-hub/camunda-cloud-examples/blob/main/twitter-review-java-springboot/src/test/java/org/camunda/community/examples/twitter/TestTwitterProcess.java
@Disabled
@SpringBootTest
@ZeebeSpringTest
class ProcessTest {
public static final String PROCESS_DEFINITION = "Cross_Selling_Recommendation";
private static final String START_EVENT = "CrossSellingStartEvent";
private static final String PICK_CONTENT_SERVICE_TASK = "PickContentServiceTask";
private static final String SEND_RECOMMENDATION_SERVICE_TASK = "SendRecommendationServiceTask";
private static final String END_EVENT = "CrossSellingRecommendationEndEvent";
@Autowired
private ZeebeClient zeebe;
@Autowired
private ZeebeTestEngine zeebeTestEngine;
@MockBean
private RecommendationPicker recommendationPicker;
@MockBean
private SendNotification sendNotification;
@Test
void shouldExecuteProcess_happy_path() throws Exception {
zeebe.newPublishMessageCommand().messageName(START_EVENT_MESSAGE_REF).correlationKey("").send();
// waitForProcessInstanceCompleted(getProcessInstanceId(RecordStream.of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION), Duration.ofSeconds(10));
waitForTaskAndComplete(PICK_CONTENT_SERVICE_TASK, PICK_CONTENT_TASK);
waitForTaskAndComplete(SEND_RECOMMENDATION_SERVICE_TASK, SEND_RECOMMENDATION_TASK);
assertTrue(isProcessInstanceCompleted(RecordStream.of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION));
verify(recommendationPicker).pickContent();
verify(sendNotification).send(any());
}
private ActivatedJob waitForTaskAndComplete(String taskId, String jobName) throws Exception {
// Let the workflow engine do whatever it needs to do
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
// Now get all user tasks
List<ActivatedJob> jobs = zeebe.newActivateJobsCommand().jobType(jobName).maxJobsToActivate(1).send().join().getJobs();
// Should be only one
assertTrue(jobs.size() > 0, "Job for user task '" + taskId + "' does not exist");
ActivatedJob taskJob = jobs.get(0);
// Make sure it is the right one
if (taskId != null) {
assertEquals(taskId, taskJob.getElementId());
}
zeebe.newCompleteCommand(taskJob.getKey()).send().join();
return taskJob;
}
private ActivatedJob waitForTaskAndComplete(String taskId, String jobName, Map<String, Object> variables) throws Exception {
ActivatedJob taskJob = this.waitForTaskAndComplete(taskId, jobName);
zeebe.newCompleteCommand(taskJob.getKey()).variables(variables).send().join();
return taskJob;
}
private boolean isProcessInstanceCompleted(RecordStream recordStream, String bpmnProcessId) {
return processInstance(recordStream).withBpmnProcessId(bpmnProcessId).withRejectionType(NULL_VAL).withBpmnElementType(PROCESS).withIntent(ELEMENT_COMPLETED).stream().findFirst().isPresent();
}
private long getProcessInstanceId(RecordStream recordStream, String bpmnProcessId) {
return processInstance(recordStream).withBpmnProcessId(bpmnProcessId).withRejectionType(NULL_VAL).withBpmnElementType(PROCESS).stream().findFirst().get().getKey();
}
}

View File

@@ -0,0 +1,166 @@
package de.weinbrecht.luc.bpm.architecture.recommendation;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Content;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.ContentId;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Description;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Recommendation;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.Customer;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.CustomerId;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.MailAddress;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.Name;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.in.RecommendationPicker;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.RecommendationQuery;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.SendNotification;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.ActiveProfiles;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.*;
import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;
import static io.camunda.zeebe.process.test.filters.RecordStream.of;
import static io.camunda.zeebe.process.test.filters.StreamFilter.processInstance;
import static io.camunda.zeebe.protocol.record.RejectionType.NULL_VAL;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_COMPLETED;
import static io.camunda.zeebe.protocol.record.value.BpmnElementType.PROCESS;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
// Source: https://github.com/camunda-community-hub/camunda-cloud-examples/blob/main/twitter-review-java-springboot/src/test/java/org/camunda/community/examples/twitter/TestTwitterProcess.java
@ActiveProfiles("test")
@SpringBootTest
@ZeebeSpringTest
class ProcessTestSpring {
public static final String PROCESS_DEFINITION = "Cross_Selling_Recommendation";
private static final String START_EVENT = "CrossSellingStartEvent";
private static final String PICK_CONTENT_SERVICE_TASK = "PickContentServiceTask";
private static final String SEND_RECOMMENDATION_SERVICE_TASK = "SendRecommendationServiceTask";
private static final String END_EVENT = "CrossSellingRecommendationEndEvent";
@Autowired
private ZeebeClient zeebe;
@Autowired
private ZeebeTestEngine zeebeTestEngine;
@MockBean
private RecommendationQuery recommendationQuery;
@MockBean
private SendNotification sendNotification;
@SpyBean
private RecommendationPicker recommendationPicker;
private final ContentId contentId = new ContentId(1L);
private final CustomerId customerId = new CustomerId("A1");
private final Recommendation recommendation = new Recommendation(
new Customer(
customerId,
new Name("Tester"),
new MailAddress("tester@web.io")
),
new Content(contentId, new Description("FooBar"))
);
@BeforeEach
void setUp() {
when(recommendationQuery.findContentById(contentId))
.thenReturn(recommendation.getContent());
when(recommendationQuery.findCustomerById(customerId))
.thenReturn(recommendation.getCustomer());
}
@Test
void shouldExecuteProcess_happy_path() throws Exception {
zeebe.newPublishMessageCommand().messageName(START_EVENT_MESSAGE_REF)
.correlationKey("")
.variables(singletonMap(CUSTOMER_NUMBER, customerId.getValue()))
.send();
waitForTaskAndComplete(PICK_CONTENT_SERVICE_TASK, PICK_CONTENT_TASK, singletonMap(CONTENT_NUMBER, contentId.getValue()));
waitForTaskAndComplete(SEND_RECOMMENDATION_SERVICE_TASK, SEND_RECOMMENDATION_TASK);
assertTrue(isProcessInstanceCompleted(of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION));
verify(recommendationPicker).pickContent();
verify(sendNotification).send(recommendation);
}
private ActivatedJob waitForTaskAndComplete(String taskId, String jobType) throws Exception {
int maxRetry = 5;
ActivatedJob taskJob = null;
for (int i = 0; i < maxRetry; i++) {
taskJob = waitAndFetchJobs(taskId, jobType);
if (taskJob != null) {
// Make sure it is the right one
assertEquals(taskId, taskJob.getElementId());
zeebe.newCompleteCommand(taskJob.getKey()).send().join();
}
}
return taskJob;
}
private ActivatedJob waitForTaskAndComplete(String taskId, String jobType, Map<String, Object> variables) throws Exception {
int maxRetry = 5;
ActivatedJob taskJob = null;
for (int i = 0; i < maxRetry; i++) {
taskJob = waitAndFetchJobs(taskId, jobType);
if (taskJob != null) {
// Make sure it is the right one
assertEquals(taskId, taskJob.getElementId());
zeebe.newCompleteCommand(taskJob.getKey()).variables(variables).send().join();
}
}
return taskJob;
}
private ActivatedJob waitAndFetchJobs(String taskId, String jobType) throws InterruptedException, TimeoutException {
// Let the workflow engine do whatever it needs to do
zeebeTestEngine.waitForIdleState(ofSeconds(5));
// Now get all user tasks
List<ActivatedJob> jobs = zeebe.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join().getJobs();
if (jobs.size() == 0) {
return null;
}
return jobs.get(0);
}
private boolean isProcessInstanceCompleted(RecordStream recordStream, String bpmnProcessId) {
return processInstance(recordStream).withBpmnProcessId(bpmnProcessId)
.withRejectionType(NULL_VAL)
.withBpmnElementType(PROCESS)
.withIntent(ELEMENT_COMPLETED)
.stream().findFirst().isPresent();
}
private long getProcessInstanceId(RecordStream recordStream, String bpmnProcessId) {
return processInstance(recordStream).withBpmnProcessId(bpmnProcessId)
.withRejectionType(NULL_VAL)
.withBpmnElementType(PROCESS)
.stream().findFirst().get().getKey();
}
}

View File

@@ -0,0 +1,110 @@
package de.weinbrecht.luc.bpm.architecture.recommendation;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.command.DeployResourceCommandStep1;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static io.camunda.zeebe.process.test.filters.StreamFilter.jobRecords;
import static io.camunda.zeebe.protocol.record.intent.JobIntent.COMPLETED;
import static io.camunda.zeebe.protocol.record.intent.JobIntent.CREATED;
import static java.lang.String.format;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
// https://github.com/camunda/zeebe-process-test/blob/main/qa/abstracts/src/main/java/io/camunda/zeebe/process/test/qa/abstracts/util/Utilities.java
public class ProcessTestUtils {
public static DeploymentEvent deployResource(final ZeebeClient client, final String resource) {
return deployResources(client, resource);
}
public static DeploymentEvent deployResources(
final ZeebeClient client, final String... resources) {
final DeployResourceCommandStep1 commandStep1 = client.newDeployResourceCommand();
DeployResourceCommandStep1.DeployResourceCommandStep2 commandStep2 = null;
for (final String process : resources) {
if (commandStep2 == null) {
commandStep2 = commandStep1.addResourceFromClasspath(process);
} else {
commandStep2 = commandStep2.addResourceFromClasspath(process);
}
}
return commandStep2.send().join();
}
public static PublishMessageResponse sendMessage(
final ZeebeTestEngine engine,
final ZeebeClient client,
final String messageName,
final String correlationKey)
throws InterruptedException, TimeoutException {
return sendMessage(engine, client, messageName, correlationKey, emptyMap());
}
public static PublishMessageResponse sendMessage(
final ZeebeTestEngine engine,
final ZeebeClient client,
final String messageName,
final String correlationKey,
final Map<String, Object> variables)
throws InterruptedException, TimeoutException {
final PublishMessageResponse response =
client
.newPublishMessageCommand()
.messageName(messageName)
.correlationKey(correlationKey)
.variables(variables)
.send()
.join();
engine.waitForIdleState(ofSeconds(1));
return response;
}
public static ActivateJobsResponse activateSingleJob(
final ZeebeClient client, final String jobType) {
return client.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join();
}
public static void completeTaskWithType(
final ZeebeTestEngine engine, final ZeebeClient client, final String taskId, final String jobType)
throws InterruptedException, TimeoutException {
final List<Record<JobRecordValue>> records =
jobRecords(RecordStream.of(engine.getRecordStreamSource()))
.withElementId(taskId)
.withIntent(CREATED)
.stream()
.collect(toList());
jobRecords(RecordStream.of(engine.getRecordStreamSource()))
.withElementId(taskId)
.withIntent(COMPLETED)
.stream()
.forEach(record -> records.removeIf(r -> record.getKey() == r.getKey()));
if (!records.isEmpty()) {
final Record<JobRecordValue> lastRecord;
lastRecord = records.get(records.size() - 1);
assertThat(lastRecord.getValue().getType()).isEqualTo(jobType);
client.newCompleteCommand(lastRecord.getKey()).send().join();
} else {
throw new IllegalStateException(
format("Tried to complete task `%s`, but it was not found", taskId));
}
engine.waitForIdleState(ofSeconds(1));
}
}

View File

@@ -0,0 +1,46 @@
package de.weinbrecht.luc.bpm.architecture.recommendation;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.extension.ZeebeProcessTest;
import org.junit.jupiter.api.Test;
import static de.weinbrecht.luc.bpm.architecture.recommendation.ProcessTestUtils.*;
import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.*;
import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;
@ZeebeProcessTest
class ProcessTestZeebe {
private ZeebeTestEngine engine;
private ZeebeClient client;
private static final String START_EVENT = "CrossSellingStartEvent";
private static final String PICK_CONTENT_SERVICE_TASK = "PickContentServiceTask";
private static final String SEND_RECOMMENDATION_SERVICE_TASK = "SendRecommendationServiceTask";
private static final String END_EVENT = "CrossSellingRecommendationEndEvent";
@Test
void testRunsProcess() throws Exception {
deployResource(client, "cross_selling_recommendation.bpmn");
final PublishMessageResponse response = sendMessage(engine, client, START_EVENT_MESSAGE_REF, "");
assertThat(response).extractingProcessInstance()
.hasPassedElement(START_EVENT)
.isWaitingAtElements(PICK_CONTENT_SERVICE_TASK);
completeTaskWithType(engine , client, PICK_CONTENT_SERVICE_TASK, PICK_CONTENT_TASK);
assertThat(response).extractingProcessInstance()
.hasPassedElement(PICK_CONTENT_SERVICE_TASK)
.isWaitingAtElements(SEND_RECOMMENDATION_SERVICE_TASK);
completeTaskWithType(engine , client, SEND_RECOMMENDATION_SERVICE_TASK, SEND_RECOMMENDATION_TASK);
assertThat(response).extractingProcessInstance()
.hasPassedElementsInOrder(SEND_RECOMMENDATION_SERVICE_TASK, END_EVENT)
.isCompleted();
}
}

View File

@@ -1,22 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="processEngineConfiguration" class="org.camunda.bpm.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
<property name="jdbcUrl" value="jdbc:h2:mem:camunda;DB_CLOSE_DELAY=1000" />
<property name="jdbcDriver" value="org.h2.Driver" />
<property name="jdbcUsername" value="sa" />
<property name="jdbcPassword" value="" />
<!-- Database configurations -->
<property name="databaseSchemaUpdate" value="true" />
<!-- job executor configurations -->
<property name="jobExecutorActivate" value="false" />
<property name="history" value="full" />
</bean>
</beans>