Finish but not works

This commit is contained in:
webmak1 2022-04-09 21:44:43 +03:00
parent 40d354af5b
commit dfcd60c4cb
12 changed files with 207 additions and 23 deletions

View File

@ -2,10 +2,9 @@
<br/>
### YouTube
https://www.youtube.com/watch?v=SqVfCyfCJqw
<div align="center">
<iframe width="853" height="480" src="https://www.youtube.com/embed/SqVfCyfCJqw" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
</div>
<br/>

View File

@ -0,0 +1,24 @@
package org.javadev.kafkaexample;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, Message> kafkaTemplate) {
return args -> {
kafkaTemplate.send("amigoscode", new Message("Hello From Kafka", LocalDateTime.now()));
};
}
} // The end of Class;

View File

@ -1,13 +0,0 @@
package org.javadev.kafkaexample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaExampleApplication.class, args);
}
}

View File

@ -0,0 +1,14 @@
package org.javadev.kafkaexample;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaListeners {
@KafkaListener(topics = "amigoscode", groupId = "groupId", containerGroup = "messageFactory")
void listener(Message data) {
System.out.println("Listener received: " + data + " ");
}
} // The End of Class;

View File

@ -0,0 +1,7 @@
package org.javadev.kafkaexample;
import java.time.LocalDateTime;
public record Message(String message, LocalDateTime created) {
}

View File

@ -0,0 +1,30 @@
package org.javadev.kafkaexample;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@RestController
@RequestMapping("api/v1/messages")
public class MessageController {
private final KafkaTemplate<String, Message> kafkaTemplate;
public MessageController(KafkaTemplate<String, Message> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping
public void publish(@RequestBody MessageRequest request) {
Message message = new Message(request.message(), LocalDateTime.now());
System.out.println("message" + message);
kafkaTemplate.send("amigoscode", message);
}
} // The End of Class;

View File

@ -0,0 +1,5 @@
package org.javadev.kafkaexample;
public record MessageRequest(String message) {
} // The End of Class;

View File

@ -0,0 +1,50 @@
package org.javadev.kafkaexample.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.javadev.kafkaexample.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> consumerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
@Bean
public ConsumerFactory<String, Message> consumerFactory(){
JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("org.javadev");
return new DefaultKafkaConsumerFactory<>(
consumerConfig(),
new StringDeserializer(),
new JsonDeserializer<>()
);
}
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Message>> factory(
ConsumerFactory<String, Message> consumerFactory
){
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
} // The End of Class;

View File

@ -0,0 +1,44 @@
package org.javadev.kafkaexample.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.javadev.kafkaexample.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Message> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate(
ProducerFactory<String, Message> producerFactory
){
return new KafkaTemplate<>(producerFactory);
}
} // The End of Class;

View File

@ -0,0 +1,15 @@
package org.javadev.kafkaexample.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic amigoscodeTopic(){
return TopicBuilder.name("amigoscode").build();
}
}

View File

@ -1 +1 @@
spring.kafka.bootstrap-servers=localhost:9092

View File

@ -1,11 +1,5 @@
# [Amigoscode, Nelson] Microservices and Distributed Systems [ENG, 2022] Kafka Part
<br/>
<div align="center">
<iframe width="853" height="480" src="https://www.youtube.com/embed/SqVfCyfCJqw" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
</div>
<br/>
@ -22,6 +16,21 @@
![Application](/img/pic02.png?raw=true)
<br/>
```
// POST
$ curl \
--data '{
"message":"Hooray Amigoscode"
}' \
--header "Content-Type: application/json" \
--request POST \
--url http://localhost:8080/api/v1/messages \
| jq
```
<br/><br/>
---