Seldon Kafka Integration Example with SpaCy Reddit Model — seldon-core documentation (original) (raw)

In this model we will build upon the code from the Seldon SpaCy NLP example for text classification.

You will learn how to deploy the model using the Kafka protocol.

Requirements

In your commands: * helm 3.x * kubectl

In cluster: * Install Seldon * Install Kafka as per the instructions below

Setup Kafka

Ensure your helm repo has access to the strimzi Kafka charts which we’ll use to install.

!helm repo add strimzi https://strimzi.io/charts/

"strimzi" has been added to your repositories

Install the Kafka operator in your cluster

!helm install my-release strimzi/strimzi-kafka-operator

NAME: my-release LAST DEPLOYED: Tue Sep 29 08:31:41 2020 NAMESPACE: default STATUS: deployed REVISION: 1 TEST SUITE: None NOTES: Thank you for installing strimzi-kafka-operator-0.19.0

To create a Kafka cluster refer to the following documentation.

https://strimzi.io/docs/operators/0.19.0/using.html#deploying-cluster-operator-helm-chart-str

We now create a kafka cluster instantiation with a simple setup as outlined below.

%%writefile kafka-cluster-config.yaml apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: replicas: 1 listeners: plain: {} tls: {} external: type: nodeport tls: false storage: type: ephemeral config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {}

Overwriting kafka-cluster-config.yaml

!kubectl apply -f kafka-cluster-config.yaml

kafka.kafka.strimzi.io/my-cluster created

We can now check that kafka was installed correctly

!kubectl get pods | grep my-cluster

my-cluster-entity-operator-df58f8b9f-s9wx5 3/3 Running 0 148m my-cluster-kafka-0 2/2 Running 0 149m my-cluster-zookeeper-0 1/1 Running 0 149m

Create topics

We now need to create the input and output topics for our reddit classifier

%%writefile topics.yaml apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: reddit-classifier-input labels: strimzi.io/cluster: "my-cluster" spec: partitions: 2 replicas: 1

apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: reddit-classifier-output labels: strimzi.io/cluster: "my-cluster" spec: partitions: 2 replicas: 1

!kubectl apply -f topics.yaml

kafkatopic.kafka.strimzi.io/reddit-classifier-input created kafkatopic.kafka.strimzi.io/reddit-classifier-output created

Train Spacy Sklearn Model

To train the spacy sklearn model you can follow the instructions in the SKlearn Spacy Model Example.

Alternatively you can just use the image that is saved in the seldon dockerhub with the image seldonio/reddit-classifier:0.1

Deploy SpaCy Text Classifier

Now we’re able to define the YAML that will be used to deploy the configuration.

%%writefile sdep_reddit_kafka.yaml apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: reddit-kafka spec: serverType: kafka predictors:

Overwriting sdep_reddit_kafka.yaml

!kubectl apply -f sdep_reddit_kafka.yaml

seldondeployment.machinelearning.seldon.io/reddit-kafka configured

We can confirm that now the model is running as expected:

!kubectl get pods | grep reddit

reddit-kafka-default-0-classifier-c6ccdd66f-vmf4v 2/2 Running 0 45m

Send real time data for stream processing

We can now send real time data for stream processing.

Below we send a single input with the text “This is an input”, which will be consumed from the input topic.

!kubectl run --quiet=true -it --rm kafkaconsumer --image=bitnami/kafka:2.6.0 --restart=Never --command --
bash -c "echo '{"data": {"ndarray": ["This is an input"]}}'
| kafka-console-producer.sh --broker-list my-cluster-kafka-external-bootstrap.default:9094 --topic reddit-classifier-input"

Check the data processed

We now are able to see all the data that has been pushed to the output topic reddit-classifier-output.

This allows us to see all the inputs that have been processed. We will be listening for 10 seconds to ensure all data is found.

!kubectl run --quiet=true -it --rm kafkaproducer --image=bitnami/kafka:2.6.0 --restart=Never --command --
kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-external-bootstrap.default:9094 --topic reddit-classifier-output
--from-beginning --timeout-ms 10000

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

{"data":{"names":["t:0","t:1"],"ndarray":[[0.6758450844706712,0.32415491552932885]]},"meta":{}}

[2020-09-29 11:45:06,366] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 9 messages