diff --git a/loan-agreement/pom.xml b/loan-agreement/pom.xml index d47e6c4..093b25f 100644 --- a/loan-agreement/pom.xml +++ b/loan-agreement/pom.xml @@ -72,6 +72,13 @@ test + + io.camunda + zeebe-process-test-extension + ${version.zeebe} + test + + org.springframework.boot spring-boot-starter-web @@ -159,6 +166,14 @@ ${project.artifactId} + + + ${project.basedir}/src/test/resources + + + ${project.basedir}/src/main/resources + + org.apache.maven.plugins diff --git a/loan-agreement/src/main/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/adapter/in/process/SendCrossSellingRecommendation.java b/loan-agreement/src/main/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/adapter/in/process/SendCrossSellingRecommendation.java index a83e55f..4063a34 100644 --- a/loan-agreement/src/main/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/adapter/in/process/SendCrossSellingRecommendation.java +++ b/loan-agreement/src/main/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/adapter/in/process/SendCrossSellingRecommendation.java @@ -26,7 +26,6 @@ public class SendCrossSellingRecommendation { Long loanAgreementNumber = ((Number) job.getVariablesAsMap().get(LOAN_AGREEMENT_NUMBER)).longValue(); LoanAgreement loanAgreement = loanAgreementQuery.loadByNumber(new LoanAgreementNumber(loanAgreementNumber)); - // TODO: How to get the business key? recommendationTrigger.startLoanAgreement(new CaseId("11"), loanAgreement); client.newCompleteCommand(job.getKey()) diff --git a/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTest.java b/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestSpring.java similarity index 99% rename from loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTest.java rename to loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestSpring.java index d8dca3e..d191f7f 100644 --- a/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTest.java +++ b/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestSpring.java @@ -34,7 +34,7 @@ import static org.mockito.Mockito.verify; @Disabled @SpringBootTest @ZeebeSpringTest -class ProcessTest { +class ProcessTestSpring { public static final String PROCESS_DEFINITION = "Loan_Agreement"; private static final String START_EVENT = "LoanAgreementReceivedStartEvent"; diff --git a/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestUtils.java b/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestUtils.java new file mode 100644 index 0000000..05c62eb --- /dev/null +++ b/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestUtils.java @@ -0,0 +1,110 @@ +package de.weinbrecht.luc.bpm.architecture.loan.agreement; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.command.DeployResourceCommandStep1; +import io.camunda.zeebe.client.api.response.ActivateJobsResponse; +import io.camunda.zeebe.client.api.response.DeploymentEvent; +import io.camunda.zeebe.client.api.response.PublishMessageResponse; +import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.process.test.filters.RecordStream; +import io.camunda.zeebe.protocol.record.Record; +import io.camunda.zeebe.protocol.record.value.JobRecordValue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static io.camunda.zeebe.process.test.filters.StreamFilter.jobRecords; +import static io.camunda.zeebe.protocol.record.intent.JobIntent.COMPLETED; +import static io.camunda.zeebe.protocol.record.intent.JobIntent.CREATED; +import static java.lang.String.format; +import static java.time.Duration.ofSeconds; +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; + +// https://github.com/camunda/zeebe-process-test/blob/main/qa/abstracts/src/main/java/io/camunda/zeebe/process/test/qa/abstracts/util/Utilities.java +public class ProcessTestUtils { + + public static DeploymentEvent deployResource(final ZeebeClient client, final String resource) { + return deployResources(client, resource); + } + + public static DeploymentEvent deployResources( + final ZeebeClient client, final String... resources) { + final DeployResourceCommandStep1 commandStep1 = client.newDeployResourceCommand(); + + DeployResourceCommandStep1.DeployResourceCommandStep2 commandStep2 = null; + for (final String process : resources) { + if (commandStep2 == null) { + commandStep2 = commandStep1.addResourceFromClasspath(process); + } else { + commandStep2 = commandStep2.addResourceFromClasspath(process); + } + } + + return commandStep2.send().join(); + } + + public static PublishMessageResponse sendMessage( + final ZeebeTestEngine engine, + final ZeebeClient client, + final String messageName, + final String correlationKey) + throws InterruptedException, TimeoutException { + return sendMessage(engine, client, messageName, correlationKey, emptyMap()); + } + + public static PublishMessageResponse sendMessage( + final ZeebeTestEngine engine, + final ZeebeClient client, + final String messageName, + final String correlationKey, + final Map variables) + throws InterruptedException, TimeoutException { + final PublishMessageResponse response = + client + .newPublishMessageCommand() + .messageName(messageName) + .correlationKey(correlationKey) + .variables(variables) + .send() + .join(); + engine.waitForIdleState(ofSeconds(1)); + return response; + } + + public static ActivateJobsResponse activateSingleJob( + final ZeebeClient client, final String jobType) { + return client.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join(); + } + + public static void completeTaskWithType( + final ZeebeTestEngine engine, final ZeebeClient client, final String taskId, final String jobType) + throws InterruptedException, TimeoutException { + final List> records = + jobRecords(RecordStream.of(engine.getRecordStreamSource())) + .withElementId(taskId) + .withIntent(CREATED) + .stream() + .collect(toList()); + + jobRecords(RecordStream.of(engine.getRecordStreamSource())) + .withElementId(taskId) + .withIntent(COMPLETED) + .stream() + .forEach(record -> records.removeIf(r -> record.getKey() == r.getKey())); + + if (!records.isEmpty()) { + final Record lastRecord; + lastRecord = records.get(records.size() - 1); + assertThat(lastRecord.getValue().getType()).isEqualTo(jobType); + client.newCompleteCommand(lastRecord.getKey()).send().join(); + } else { + throw new IllegalStateException( + format("Tried to complete task `%s`, but it was not found", taskId)); + } + + engine.waitForIdleState(ofSeconds(1)); + } +} diff --git a/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestZeebe.java b/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestZeebe.java new file mode 100644 index 0000000..bcd3348 --- /dev/null +++ b/loan-agreement/src/test/java/de/weinbrecht/luc/bpm/architecture/loan/agreement/ProcessTestZeebe.java @@ -0,0 +1,71 @@ +package de.weinbrecht.luc.bpm.architecture.loan.agreement; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.PublishMessageResponse; +import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.process.test.extension.ZeebeProcessTest; +import org.junit.jupiter.api.Test; + +import static de.weinbrecht.luc.bpm.architecture.loan.agreement.ProcessTestUtils.*; +import static de.weinbrecht.luc.bpm.architecture.loan.agreement.adapter.common.ProcessConstants.*; +import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat; +import static java.util.Collections.singletonMap; + +@ZeebeProcessTest +class ProcessTestZeebe { + + private ZeebeTestEngine engine; + private ZeebeClient client; + + private static final String START_EVENT = "LoanAgreementReceivedStartEvent"; + private static final String APPROVE_RULE_TASK = "ApproveAgreementRuleTask"; + private static final String APPROVE_AGREEMENT_SERVICE_TASK = "ApproveLoanAgreementServiceTask"; + private static final String SEND_CROSS_SELLING_EVENT = "SendCrossSellingEvent"; + private static final String APPROVED_END_EVENT = "LoanAgreementApprovedEndEvent"; + + private static final String REJECT_AGREEMENT_SERVICE_TASK = "RejectLoanAgreementServiceTask"; + private static final String NOT_APPROVED_END_EVENT = "LoanAgreementNotApprovedEndEvent"; + + @Test + void testRunsHappyPath() throws Exception { + deployResources(client, "loan_agreement.bpmn", "approve_agreement.dmn"); + + final PublishMessageResponse response = sendMessage(engine, client, + LOAN_START_EVENT_MESSAGE_REF, "", singletonMap(LOAN_AGREEMENT_NUMBER, 1L)); + + assertThat(response).extractingProcessInstance() + .hasPassedElementsInOrder(START_EVENT, APPROVE_RULE_TASK) + .isWaitingAtElements(APPROVE_AGREEMENT_SERVICE_TASK); + + completeTaskWithType(engine , client, APPROVE_AGREEMENT_SERVICE_TASK, LOAN_AGREEMENT_TASK); + + assertThat(response).extractingProcessInstance() + .hasPassedElement(START_EVENT) + .isWaitingAtElements(SEND_CROSS_SELLING_EVENT); + + completeTaskWithType(engine , client, SEND_CROSS_SELLING_EVENT, SEND_CROSS_SELLING_RECOMMENDATION_TASK); + + assertThat(response).extractingProcessInstance() + .hasPassedElementsInOrder(SEND_CROSS_SELLING_EVENT, APPROVED_END_EVENT) + .isCompleted(); + } + + @Test + void testRunsExceptionPath() throws Exception { + deployResources(client, "loan_agreement.bpmn", "approve_agreement.dmn"); + + final PublishMessageResponse response = sendMessage(engine, client, + LOAN_START_EVENT_MESSAGE_REF, "", singletonMap(LOAN_AGREEMENT_NUMBER, 6L)); + + assertThat(response).extractingProcessInstance() + .hasPassedElementsInOrder(START_EVENT, APPROVE_RULE_TASK) + .isWaitingAtElements(REJECT_AGREEMENT_SERVICE_TASK); + + completeTaskWithType(engine , client, REJECT_AGREEMENT_SERVICE_TASK, LOAN_REJECTION_TASK); + + assertThat(response).extractingProcessInstance() + .hasPassedElementsInOrder(NOT_APPROVED_END_EVENT) + .hasNotPassedElement(SEND_CROSS_SELLING_EVENT) + .isCompleted(); + } +} diff --git a/loan-agreement/src/test/resources/application.yaml b/loan-agreement/src/test/resources/application.yaml index f3df7e3..84e2eec 100644 --- a/loan-agreement/src/test/resources/application.yaml +++ b/loan-agreement/src/test/resources/application.yaml @@ -1,5 +1 @@ -spring.datasource.url: jdbc:h2:file:./camunda-h2-database - -camunda.bpm.admin-user: - id: admin - password: pw \ No newline at end of file +spring.datasource.url: jdbc:h2:file:./camunda-h2-database \ No newline at end of file diff --git a/loan-agreement/src/test/resources/camunda.cfg.xml b/loan-agreement/src/test/resources/camunda.cfg.xml deleted file mode 100644 index 9210af2..0000000 --- a/loan-agreement/src/test/resources/camunda.cfg.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/recommendation/pom.xml b/recommendation/pom.xml index 61f385a..7ffc533 100644 --- a/recommendation/pom.xml +++ b/recommendation/pom.xml @@ -71,6 +71,13 @@ test + + io.camunda + zeebe-process-test-extension + ${version.zeebe} + test + + org.springframework.boot spring-boot-starter-web @@ -142,6 +149,14 @@ ${project.artifactId} + + + ${project.basedir}/src/test/resources + + + ${project.basedir}/src/main/resources + + org.apache.maven.plugins diff --git a/recommendation/src/main/java/de/weinbrecht/luc/bpm/architecture/recommendation/adapter/out/db/content/ExampleContentRunner.java b/recommendation/src/main/java/de/weinbrecht/luc/bpm/architecture/recommendation/adapter/out/db/content/ExampleContentRunner.java index 074ad05..0c5a954 100644 --- a/recommendation/src/main/java/de/weinbrecht/luc/bpm/architecture/recommendation/adapter/out/db/content/ExampleContentRunner.java +++ b/recommendation/src/main/java/de/weinbrecht/luc/bpm/architecture/recommendation/adapter/out/db/content/ExampleContentRunner.java @@ -2,10 +2,12 @@ package de.weinbrecht.luc.bpm.architecture.recommendation.adapter.out.db.content import lombok.RequiredArgsConstructor; import org.springframework.boot.CommandLineRunner; +import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; @RequiredArgsConstructor @Component +@Profile("!test") public class ExampleContentRunner implements CommandLineRunner { private final ContentCRUDRepository contentCRUDRepository; diff --git a/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTest.java b/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTest.java deleted file mode 100644 index 03f2a31..0000000 --- a/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTest.java +++ /dev/null @@ -1,103 +0,0 @@ -package de.weinbrecht.luc.bpm.architecture.recommendation; - -import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.in.RecommendationPicker; -import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.SendNotification; -import io.camunda.zeebe.client.ZeebeClient; -import io.camunda.zeebe.client.api.response.ActivatedJob; -import io.camunda.zeebe.process.test.api.ZeebeTestEngine; -import io.camunda.zeebe.process.test.filters.RecordStream; -import io.camunda.zeebe.spring.test.ZeebeSpringTest; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; - -import java.time.Duration; -import java.util.List; -import java.util.Map; - -import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.*; -import static io.camunda.zeebe.process.test.filters.StreamFilter.processInstance; -import static io.camunda.zeebe.protocol.record.RejectionType.NULL_VAL; -import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_COMPLETED; -import static io.camunda.zeebe.protocol.record.value.BpmnElementType.PROCESS; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; - -// Source: https://github.com/camunda-community-hub/camunda-cloud-examples/blob/main/twitter-review-java-springboot/src/test/java/org/camunda/community/examples/twitter/TestTwitterProcess.java -@Disabled -@SpringBootTest -@ZeebeSpringTest -class ProcessTest { - - public static final String PROCESS_DEFINITION = "Cross_Selling_Recommendation"; - - private static final String START_EVENT = "CrossSellingStartEvent"; - private static final String PICK_CONTENT_SERVICE_TASK = "PickContentServiceTask"; - private static final String SEND_RECOMMENDATION_SERVICE_TASK = "SendRecommendationServiceTask"; - private static final String END_EVENT = "CrossSellingRecommendationEndEvent"; - - @Autowired - private ZeebeClient zeebe; - - @Autowired - private ZeebeTestEngine zeebeTestEngine; - - @MockBean - private RecommendationPicker recommendationPicker; - - @MockBean - private SendNotification sendNotification; - - @Test - void shouldExecuteProcess_happy_path() throws Exception { - zeebe.newPublishMessageCommand().messageName(START_EVENT_MESSAGE_REF).correlationKey("").send(); - -// waitForProcessInstanceCompleted(getProcessInstanceId(RecordStream.of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION), Duration.ofSeconds(10)); - - waitForTaskAndComplete(PICK_CONTENT_SERVICE_TASK, PICK_CONTENT_TASK); - waitForTaskAndComplete(SEND_RECOMMENDATION_SERVICE_TASK, SEND_RECOMMENDATION_TASK); - - assertTrue(isProcessInstanceCompleted(RecordStream.of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION)); - - verify(recommendationPicker).pickContent(); - verify(sendNotification).send(any()); - } - - private ActivatedJob waitForTaskAndComplete(String taskId, String jobName) throws Exception { - // Let the workflow engine do whatever it needs to do - zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10)); - - // Now get all user tasks - List jobs = zeebe.newActivateJobsCommand().jobType(jobName).maxJobsToActivate(1).send().join().getJobs(); - - // Should be only one - assertTrue(jobs.size() > 0, "Job for user task '" + taskId + "' does not exist"); - ActivatedJob taskJob = jobs.get(0); - // Make sure it is the right one - if (taskId != null) { - assertEquals(taskId, taskJob.getElementId()); - } - - zeebe.newCompleteCommand(taskJob.getKey()).send().join(); - - return taskJob; - } - - private ActivatedJob waitForTaskAndComplete(String taskId, String jobName, Map variables) throws Exception { - ActivatedJob taskJob = this.waitForTaskAndComplete(taskId, jobName); - zeebe.newCompleteCommand(taskJob.getKey()).variables(variables).send().join(); - return taskJob; - } - - private boolean isProcessInstanceCompleted(RecordStream recordStream, String bpmnProcessId) { - return processInstance(recordStream).withBpmnProcessId(bpmnProcessId).withRejectionType(NULL_VAL).withBpmnElementType(PROCESS).withIntent(ELEMENT_COMPLETED).stream().findFirst().isPresent(); - } - - private long getProcessInstanceId(RecordStream recordStream, String bpmnProcessId) { - return processInstance(recordStream).withBpmnProcessId(bpmnProcessId).withRejectionType(NULL_VAL).withBpmnElementType(PROCESS).stream().findFirst().get().getKey(); - } -} diff --git a/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestSpring.java b/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestSpring.java new file mode 100644 index 0000000..2993c8f --- /dev/null +++ b/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestSpring.java @@ -0,0 +1,166 @@ +package de.weinbrecht.luc.bpm.architecture.recommendation; + +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Content; +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.ContentId; +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Description; +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Recommendation; +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.Customer; +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.CustomerId; +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.MailAddress; +import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.Name; +import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.in.RecommendationPicker; +import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.RecommendationQuery; +import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.SendNotification; +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.ActivatedJob; +import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.process.test.filters.RecordStream; +import io.camunda.zeebe.spring.test.ZeebeSpringTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.ActiveProfiles; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.*; +import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat; +import static io.camunda.zeebe.process.test.filters.RecordStream.of; +import static io.camunda.zeebe.process.test.filters.StreamFilter.processInstance; +import static io.camunda.zeebe.protocol.record.RejectionType.NULL_VAL; +import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_COMPLETED; +import static io.camunda.zeebe.protocol.record.value.BpmnElementType.PROCESS; +import static java.time.Duration.ofSeconds; +import static java.util.Collections.singletonMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +// Source: https://github.com/camunda-community-hub/camunda-cloud-examples/blob/main/twitter-review-java-springboot/src/test/java/org/camunda/community/examples/twitter/TestTwitterProcess.java +@ActiveProfiles("test") +@SpringBootTest +@ZeebeSpringTest +class ProcessTestSpring { + + public static final String PROCESS_DEFINITION = "Cross_Selling_Recommendation"; + + private static final String START_EVENT = "CrossSellingStartEvent"; + private static final String PICK_CONTENT_SERVICE_TASK = "PickContentServiceTask"; + private static final String SEND_RECOMMENDATION_SERVICE_TASK = "SendRecommendationServiceTask"; + private static final String END_EVENT = "CrossSellingRecommendationEndEvent"; + + @Autowired + private ZeebeClient zeebe; + + @Autowired + private ZeebeTestEngine zeebeTestEngine; + + @MockBean + private RecommendationQuery recommendationQuery; + + @MockBean + private SendNotification sendNotification; + + @SpyBean + private RecommendationPicker recommendationPicker; + + private final ContentId contentId = new ContentId(1L); + private final CustomerId customerId = new CustomerId("A1"); + + private final Recommendation recommendation = new Recommendation( + new Customer( + customerId, + new Name("Tester"), + new MailAddress("tester@web.io") + ), + new Content(contentId, new Description("FooBar")) + ); + + @BeforeEach + void setUp() { + when(recommendationQuery.findContentById(contentId)) + .thenReturn(recommendation.getContent()); + when(recommendationQuery.findCustomerById(customerId)) + .thenReturn(recommendation.getCustomer()); + } + + @Test + void shouldExecuteProcess_happy_path() throws Exception { + zeebe.newPublishMessageCommand().messageName(START_EVENT_MESSAGE_REF) + .correlationKey("") + .variables(singletonMap(CUSTOMER_NUMBER, customerId.getValue())) + .send(); + + waitForTaskAndComplete(PICK_CONTENT_SERVICE_TASK, PICK_CONTENT_TASK, singletonMap(CONTENT_NUMBER, contentId.getValue())); + waitForTaskAndComplete(SEND_RECOMMENDATION_SERVICE_TASK, SEND_RECOMMENDATION_TASK); + + assertTrue(isProcessInstanceCompleted(of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION)); + + verify(recommendationPicker).pickContent(); + verify(sendNotification).send(recommendation); + } + + private ActivatedJob waitForTaskAndComplete(String taskId, String jobType) throws Exception { + int maxRetry = 5; + + ActivatedJob taskJob = null; + for (int i = 0; i < maxRetry; i++) { + taskJob = waitAndFetchJobs(taskId, jobType); + if (taskJob != null) { + // Make sure it is the right one + assertEquals(taskId, taskJob.getElementId()); + zeebe.newCompleteCommand(taskJob.getKey()).send().join(); + } + } + return taskJob; + } + + private ActivatedJob waitForTaskAndComplete(String taskId, String jobType, Map variables) throws Exception { + int maxRetry = 5; + + ActivatedJob taskJob = null; + for (int i = 0; i < maxRetry; i++) { + taskJob = waitAndFetchJobs(taskId, jobType); + if (taskJob != null) { + // Make sure it is the right one + assertEquals(taskId, taskJob.getElementId()); + zeebe.newCompleteCommand(taskJob.getKey()).variables(variables).send().join(); + } + } + return taskJob; + } + + private ActivatedJob waitAndFetchJobs(String taskId, String jobType) throws InterruptedException, TimeoutException { + // Let the workflow engine do whatever it needs to do + zeebeTestEngine.waitForIdleState(ofSeconds(5)); + + // Now get all user tasks + List jobs = zeebe.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join().getJobs(); + + if (jobs.size() == 0) { + return null; + } + return jobs.get(0); + } + + private boolean isProcessInstanceCompleted(RecordStream recordStream, String bpmnProcessId) { + return processInstance(recordStream).withBpmnProcessId(bpmnProcessId) + .withRejectionType(NULL_VAL) + .withBpmnElementType(PROCESS) + .withIntent(ELEMENT_COMPLETED) + .stream().findFirst().isPresent(); + } + + private long getProcessInstanceId(RecordStream recordStream, String bpmnProcessId) { + return processInstance(recordStream).withBpmnProcessId(bpmnProcessId) + .withRejectionType(NULL_VAL) + .withBpmnElementType(PROCESS) + .stream().findFirst().get().getKey(); + } +} diff --git a/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestUtils.java b/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestUtils.java new file mode 100644 index 0000000..7f28c50 --- /dev/null +++ b/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestUtils.java @@ -0,0 +1,110 @@ +package de.weinbrecht.luc.bpm.architecture.recommendation; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.command.DeployResourceCommandStep1; +import io.camunda.zeebe.client.api.response.ActivateJobsResponse; +import io.camunda.zeebe.client.api.response.DeploymentEvent; +import io.camunda.zeebe.client.api.response.PublishMessageResponse; +import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.process.test.filters.RecordStream; +import io.camunda.zeebe.protocol.record.Record; +import io.camunda.zeebe.protocol.record.value.JobRecordValue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static io.camunda.zeebe.process.test.filters.StreamFilter.jobRecords; +import static io.camunda.zeebe.protocol.record.intent.JobIntent.COMPLETED; +import static io.camunda.zeebe.protocol.record.intent.JobIntent.CREATED; +import static java.lang.String.format; +import static java.time.Duration.ofSeconds; +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; + +// https://github.com/camunda/zeebe-process-test/blob/main/qa/abstracts/src/main/java/io/camunda/zeebe/process/test/qa/abstracts/util/Utilities.java +public class ProcessTestUtils { + + public static DeploymentEvent deployResource(final ZeebeClient client, final String resource) { + return deployResources(client, resource); + } + + public static DeploymentEvent deployResources( + final ZeebeClient client, final String... resources) { + final DeployResourceCommandStep1 commandStep1 = client.newDeployResourceCommand(); + + DeployResourceCommandStep1.DeployResourceCommandStep2 commandStep2 = null; + for (final String process : resources) { + if (commandStep2 == null) { + commandStep2 = commandStep1.addResourceFromClasspath(process); + } else { + commandStep2 = commandStep2.addResourceFromClasspath(process); + } + } + + return commandStep2.send().join(); + } + + public static PublishMessageResponse sendMessage( + final ZeebeTestEngine engine, + final ZeebeClient client, + final String messageName, + final String correlationKey) + throws InterruptedException, TimeoutException { + return sendMessage(engine, client, messageName, correlationKey, emptyMap()); + } + + public static PublishMessageResponse sendMessage( + final ZeebeTestEngine engine, + final ZeebeClient client, + final String messageName, + final String correlationKey, + final Map variables) + throws InterruptedException, TimeoutException { + final PublishMessageResponse response = + client + .newPublishMessageCommand() + .messageName(messageName) + .correlationKey(correlationKey) + .variables(variables) + .send() + .join(); + engine.waitForIdleState(ofSeconds(1)); + return response; + } + + public static ActivateJobsResponse activateSingleJob( + final ZeebeClient client, final String jobType) { + return client.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join(); + } + + public static void completeTaskWithType( + final ZeebeTestEngine engine, final ZeebeClient client, final String taskId, final String jobType) + throws InterruptedException, TimeoutException { + final List> records = + jobRecords(RecordStream.of(engine.getRecordStreamSource())) + .withElementId(taskId) + .withIntent(CREATED) + .stream() + .collect(toList()); + + jobRecords(RecordStream.of(engine.getRecordStreamSource())) + .withElementId(taskId) + .withIntent(COMPLETED) + .stream() + .forEach(record -> records.removeIf(r -> record.getKey() == r.getKey())); + + if (!records.isEmpty()) { + final Record lastRecord; + lastRecord = records.get(records.size() - 1); + assertThat(lastRecord.getValue().getType()).isEqualTo(jobType); + client.newCompleteCommand(lastRecord.getKey()).send().join(); + } else { + throw new IllegalStateException( + format("Tried to complete task `%s`, but it was not found", taskId)); + } + + engine.waitForIdleState(ofSeconds(1)); + } +} diff --git a/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestZeebe.java b/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestZeebe.java new file mode 100644 index 0000000..e22ea87 --- /dev/null +++ b/recommendation/src/test/java/de/weinbrecht/luc/bpm/architecture/recommendation/ProcessTestZeebe.java @@ -0,0 +1,46 @@ +package de.weinbrecht.luc.bpm.architecture.recommendation; + +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.response.PublishMessageResponse; +import io.camunda.zeebe.process.test.api.ZeebeTestEngine; +import io.camunda.zeebe.process.test.extension.ZeebeProcessTest; +import org.junit.jupiter.api.Test; + +import static de.weinbrecht.luc.bpm.architecture.recommendation.ProcessTestUtils.*; +import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.*; +import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat; + +@ZeebeProcessTest +class ProcessTestZeebe { + + private ZeebeTestEngine engine; + private ZeebeClient client; + + private static final String START_EVENT = "CrossSellingStartEvent"; + private static final String PICK_CONTENT_SERVICE_TASK = "PickContentServiceTask"; + private static final String SEND_RECOMMENDATION_SERVICE_TASK = "SendRecommendationServiceTask"; + private static final String END_EVENT = "CrossSellingRecommendationEndEvent"; + + @Test + void testRunsProcess() throws Exception { + deployResource(client, "cross_selling_recommendation.bpmn"); + + final PublishMessageResponse response = sendMessage(engine, client, START_EVENT_MESSAGE_REF, ""); + + assertThat(response).extractingProcessInstance() + .hasPassedElement(START_EVENT) + .isWaitingAtElements(PICK_CONTENT_SERVICE_TASK); + + completeTaskWithType(engine , client, PICK_CONTENT_SERVICE_TASK, PICK_CONTENT_TASK); + + assertThat(response).extractingProcessInstance() + .hasPassedElement(PICK_CONTENT_SERVICE_TASK) + .isWaitingAtElements(SEND_RECOMMENDATION_SERVICE_TASK); + + completeTaskWithType(engine , client, SEND_RECOMMENDATION_SERVICE_TASK, SEND_RECOMMENDATION_TASK); + + assertThat(response).extractingProcessInstance() + .hasPassedElementsInOrder(SEND_RECOMMENDATION_SERVICE_TASK, END_EVENT) + .isCompleted(); + } +} diff --git a/recommendation/src/test/resources/camunda.cfg.xml b/recommendation/src/test/resources/camunda.cfg.xml deleted file mode 100644 index 9210af2..0000000 --- a/recommendation/src/test/resources/camunda.cfg.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - - - - - - - - - - - - - - - - \ No newline at end of file