excercise 2 done
This commit is contained in:
@@ -0,0 +1,48 @@
|
||||
package org.distributed;
|
||||
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
public class Consumer {
|
||||
public static void main(String[] args) {
|
||||
|
||||
// Kafka configuration properties
|
||||
Properties props = new Properties();
|
||||
props.put(Tag.SERVER.label, Tag.SERVER_URL.label);
|
||||
props.put(Tag.DESERIALISER_KEY.label, Tag.DESERIALISER.label);
|
||||
props.put(Tag.DESERIALISER_VAL.label, Tag.DESERIALISER.label);
|
||||
props.put(Tag.GID.label, Tag.GROUP.label);
|
||||
|
||||
// Creating KafkaConsumer instance
|
||||
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
|
||||
|
||||
// subscribe to topic
|
||||
|
||||
consumer.subscribe(Collections.singletonList(Tag.TOPIC.label));
|
||||
// Starting consuming the messages
|
||||
while (true) {
|
||||
|
||||
try (consumer) { // it gave me no other way to close consumer
|
||||
|
||||
// Providing poll for new records from Kafka topic
|
||||
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofMillis(100));
|
||||
|
||||
// Processing the received records
|
||||
for (ConsumerRecord<Object, Object> record : records) {
|
||||
// Printing the received message
|
||||
System.out.println("Nerds Received a message: " + record.value());
|
||||
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
System.err.println("Consumer: " + e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package org.distributed;
|
||||
|
||||
import java.util.Properties;
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
|
||||
public class Producer {
|
||||
public static void main(String[] args) {
|
||||
|
||||
// Kafka configuration properties
|
||||
Properties props = new Properties();
|
||||
props.put(Tag.SERVER.label, Tag.SERVER_URL.label);
|
||||
props.put(Tag.SERIALISER_KEY.label, Tag.SERIALISER.label);
|
||||
props.put(Tag.SERIALISER_VAL.label, Tag.SERIALISER.label);
|
||||
|
||||
// Creating KafkaProducer instance
|
||||
KafkaProducer<Object, Object> producer = new KafkaProducer<>(props);
|
||||
|
||||
// Topic and message that we want to send
|
||||
String message = "Hello, Nerds Welcome to Kafka!";
|
||||
|
||||
// Sending our message to Kafka topic
|
||||
producer.send(new ProducerRecord<>(Tag.TOPIC.label, message));
|
||||
|
||||
// Closing the producer
|
||||
producer.close();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package org.distributed;
|
||||
|
||||
public enum Tag {
|
||||
// Server
|
||||
SERVER("bootstrap.servers"),
|
||||
SERVER_URL("localhost:9092"),
|
||||
// deserialiser
|
||||
DESERIALISER_KEY("key.deserializer"),
|
||||
DESERIALISER_VAL("value.deserializer"),
|
||||
DESERIALISER("org.apache.kafka.common.serialization.StringDeserializer"),
|
||||
// serialiser
|
||||
SERIALISER_KEY("key.serializer"),
|
||||
SERIALISER_VAL("value.serializer"),
|
||||
SERIALISER("org.apache.kafka.common.serialization.StringSerializer"),
|
||||
// ids
|
||||
GID("group.id"),
|
||||
GROUP("consumer_GROUP"),
|
||||
// Topic
|
||||
TOPIC("nerds");
|
||||
|
||||
public final String label;
|
||||
|
||||
private Tag(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user