Skip to main content
On this page

Dynamic header injection and removal

There are multiple interceptors available for manipulating headers, either injection or regex based removal.

This demo will run you through some of these use cases step-by-step.

View the full demo in realtime

You can either follow all the steps manually, or watch the recording

Review the docker compose environment

As can be seen from docker-compose.yaml the demo environment consists of the following services:

  • gateway1
  • gateway2
  • kafka-client
  • kafka1
  • kafka2
  • kafka3
  • schema-registry
cat docker-compose.yaml

Starting the Docker environment

Start all your Docker processes, wait for them to be up and ready, then run in background:

  • --wait: Wait for services to be running|healthy. Implies detached mode.
  • --detach: Detached mode: Run containers in the background
docker compose up --detach --wait

Creating topic users on gateway1

Creating on gateway1:

  • Topic users with partitions:1 and replication-factor:1
kafka-topics \
--bootstrap-server localhost:6969 \
--replication-factor 1 \
--partitions 1 \
--create --if-not-exists \
--topic users

Adding interceptor inject-headers

Let's create the interceptor to inject various headers

step-06-inject-headers-interceptor.json:

{
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "inject-headers"
},
"spec" : {
"comment" : "Adding interceptor: inject-headers",
"pluginClass" : "io.conduktor.gateway.interceptor.DynamicHeaderInjectionPlugin",
"priority" : 100,
"config" : {
"headers" : {
"X-MY-KEY" : "my own value",
"X-USER" : "{{user}}",
"X-INTERPOLATED" : "User {{user}} via ip {{userIp}}"
}
}
}
}
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-06-inject-headers-interceptor.json | jq

Send tom and laura into users

Producing 2 messages in users in cluster gateway1

Sending 2 events

{
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
{
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
echo '{"name":"tom","username":"tom@conduktor.io","password":"motorhead","visa":"#abc123","address":"Chancery lane, London"}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--topic users

echo '{"name":"laura","username":"laura@conduktor.io","password":"kitesurf","visa":"#888999XZ","address":"Dubai, UAE"}' | \
kafka-console-producer \
--bootstrap-server localhost:6969 \
--topic users

Verify tom and laura have the corresponding headers

Verify tom and laura have the corresponding headers in cluster gateway1

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic users \
--from-beginning \
--max-messages 3 \
--timeout-ms 3000 \
--property print.headers=true | jq

Returns 2 events

{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
}
{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
}

Adding interceptor remove-headers

Let's create the interceptor remove-headers to remove headers that match X-MY-.*

step-09-remove-headers-interceptor.json:

{
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "remove-headers"
},
"spec" : {
"comment" : "Adding interceptor: remove-headers",
"pluginClass" : "io.conduktor.gateway.interceptor.safeguard.MessageHeaderRemovalPlugin",
"priority" : 100,
"config" : {
"headerKeyRegex" : "X-MY-.*"
}
}
}
curl \
--silent \
--request PUT "http://localhost:8888/gateway/v2/interceptor" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data @step-09-remove-headers-interceptor.json | jq

Verify tom and laura have the corresponding headers

Verify tom and laura have the corresponding headers in cluster gateway1

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic users \
--from-beginning \
--max-messages 3 \
--timeout-ms 3000 \
--property print.headers=true | jq

returns 2 events

{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-USER" : "anonymous"
},
"value" : {
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
}
{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-USER" : "anonymous"
},
"value" : {
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
}

Remove interceptor remove-headers

Let's delete the interceptor remove-headers so we can access all our headers again

curl \
--silent \
--request DELETE "http://localhost:8888/gateway/v2/interceptor/remove-headers" \
--header "Content-Type: application/json" \
--user "admin:conduktor" \
--data-raw '{
"vCluster" : "passthrough"
}' | jq

Verify tom and laura have X-MY-KEY back

Verify tom and laura have X-MY-KEY back in cluster gateway1

kafka-console-consumer \
--bootstrap-server localhost:6969 \
--topic users \
--from-beginning \
--max-messages 3 \
--timeout-ms 3000 \
--property print.headers=true | jq

returns 2 events

{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "tom",
"username" : "tom@conduktor.io",
"password" : "motorhead",
"visa" : "#abc123",
"address" : "Chancery lane, London"
}
}
{
"headers" : {
"X-INTERPOLATED" : "User anonymous via ip 172.23.0.1",
"X-MY-KEY" : "my own value",
"X-USER" : "anonymous"
},
"value" : {
"name" : "laura",
"username" : "laura@conduktor.io",
"password" : "kitesurf",
"visa" : "#888999XZ",
"address" : "Dubai, UAE"
}
}

Tearing down the docker environment

Remove all your docker processes and associated volumes

  • --volumes: Remove named volumes declared in the "volumes" section of the Compose file and anonymous volumes attached to containers.
docker compose down --volumes

Conclusion

Leveraging headers in Kafka is of tremendous help!