@@ -31,6 +31,7 @@ import com.mongodb.reactivestreams.client.ReactiveContextProvider;
|
|||||||
|
|
||||||
import io.micrometer.observation.Observation;
|
import io.micrometer.observation.Observation;
|
||||||
import io.micrometer.observation.ObservationRegistry;
|
import io.micrometer.observation.ObservationRegistry;
|
||||||
|
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
|
||||||
import reactor.core.CoreSubscriber;
|
import reactor.core.CoreSubscriber;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -85,8 +86,7 @@ public class ContextProviderFactory {
|
|||||||
|
|
||||||
Observation currentObservation = observationRegistry.getCurrentObservation();
|
Observation currentObservation = observationRegistry.getCurrentObservation();
|
||||||
if (currentObservation != null) {
|
if (currentObservation != null) {
|
||||||
// Aligned with ObservationThreadLocalAccessor.KEY
|
requestContext.put(ObservationThreadLocalAccessor.KEY, currentObservation);
|
||||||
requestContext.put("micrometer.observation", currentObservation);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return requestContext;
|
return requestContext;
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import com.mongodb.event.CommandSucceededEvent;
|
|||||||
|
|
||||||
import io.micrometer.observation.Observation;
|
import io.micrometer.observation.Observation;
|
||||||
import io.micrometer.observation.ObservationRegistry;
|
import io.micrometer.observation.ObservationRegistry;
|
||||||
|
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implement MongoDB's {@link CommandListener} using Micrometer's {@link Observation} API.
|
* Implement MongoDB's {@link CommandListener} using Micrometer's {@link Observation} API.
|
||||||
@@ -42,11 +43,6 @@ public class MongoObservationCommandListener implements CommandListener {
|
|||||||
|
|
||||||
private static final Log log = LogFactory.getLog(MongoObservationCommandListener.class);
|
private static final Log log = LogFactory.getLog(MongoObservationCommandListener.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* Aligns with ObservationThreadLocalAccessor.KEY.
|
|
||||||
*/
|
|
||||||
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";
|
|
||||||
|
|
||||||
private final ObservationRegistry observationRegistry;
|
private final ObservationRegistry observationRegistry;
|
||||||
private final @Nullable ConnectionString connectionString;
|
private final @Nullable ConnectionString connectionString;
|
||||||
|
|
||||||
@@ -119,7 +115,7 @@ public class MongoObservationCommandListener implements CommandListener {
|
|||||||
|
|
||||||
observation.start();
|
observation.start();
|
||||||
|
|
||||||
requestContext.put(MICROMETER_OBSERVATION_KEY, observation);
|
requestContext.put(ObservationThreadLocalAccessor.KEY, observation);
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug(
|
log.debug(
|
||||||
@@ -136,7 +132,7 @@ public class MongoObservationCommandListener implements CommandListener {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Observation observation = requestContext.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
|
Observation observation = requestContext.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
|
||||||
if (observation == null) {
|
if (observation == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -160,7 +156,7 @@ public class MongoObservationCommandListener implements CommandListener {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Observation observation = requestContext.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
|
Observation observation = requestContext.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
|
||||||
if (observation == null) {
|
if (observation == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -185,7 +181,7 @@ public class MongoObservationCommandListener implements CommandListener {
|
|||||||
@Nullable
|
@Nullable
|
||||||
private static Observation observationFromContext(RequestContext context) {
|
private static Observation observationFromContext(RequestContext context) {
|
||||||
|
|
||||||
Observation observation = context.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
|
Observation observation = context.getOrDefault(ObservationThreadLocalAccessor.KEY, null);
|
||||||
|
|
||||||
if (observation != null) {
|
if (observation != null) {
|
||||||
|
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ public class ReactiveIntegrationTests extends SampleTestRunner {
|
|||||||
.verifyComplete();
|
.verifyComplete();
|
||||||
|
|
||||||
repository.findByLastname("Matthews") //
|
repository.findByLastname("Matthews") //
|
||||||
.contextWrite(Context.of("micrometer.observation", intermediate)) //
|
.contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, intermediate)) //
|
||||||
.as(StepVerifier::create).assertNext(actual -> {
|
.as(StepVerifier::create).assertNext(actual -> {
|
||||||
|
|
||||||
assertThat(actual).extracting("firstname", "lastname").containsExactly("Dave", "Matthews");
|
assertThat(actual).extracting("firstname", "lastname").containsExactly("Dave", "Matthews");
|
||||||
|
|||||||
Reference in New Issue
Block a user