first commit

This commit is contained in:
2025-12-11 21:50:58 +01:00
commit 506ee6ef22
18 changed files with 894 additions and 0 deletions

View File

@@ -0,0 +1,206 @@
package org.distributed;
/**
* ChatHandler.java
* by Tinsae Ghilay
* Handles chat session, message sending and receiving using JMS queues.
* Uses ActiveMQ as the message broker.
* done for Distributed Systems Course - Week 2 Task 2
* Date: June 2024
*/
import java.util.Scanner;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
// ChatHandler class to manage chat sessions
public class ChatHandler {
// chatting ids
private String userId, corespondent;
private Connection connection;
private Session session;
int status = 0; // 0: offline, 1: online
// Initialize connection and session
public void initializeSession() throws JMSException {
String BROKER_URL = "tcp://localhost:61616";
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
this.connection = factory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
// Start continuous conversation with multiple receivers
public void startConversation(Scanner sc) throws JMSException {
// Initialize all producers and consumers
Thread t = setupReceiver();
// Continuous input loop
try {
while (sc.hasNextLine()) {
String line = sc.nextLine();
// Send message to receiver
sendMessage(createProducer(), line);
// Exit condition
if (line.equalsIgnoreCase("exit")) {
System.out.println("Exiting conversation...");
break;
}
}
} catch (Exception e) {
System.err.println("Error in continuous conversation: " + e.getMessage());
} finally {
t.interrupt();
}
}
// Setup producer and consumer for a receiver
private Thread setupReceiver() throws JMSException {
MessageConsumer consumer = createConsumer();
return getThread(consumer);
}
// Create producer queue for sending messages
private MessageProducer createProducer() throws JMSException {
String from = "queue_" + userId;
Queue prod = session.createQueue(from);
return session.createProducer(prod);
}
// Create consumer queue for receiving messages
private MessageConsumer createConsumer() throws JMSException {
String to = "queue_" + getCorespondent();
Queue cons = session.createQueue(to);
return session.createConsumer(cons);
}
// Send a message to the producer
private void sendMessage(MessageProducer producer, String line) throws JMSException {
TextMessage m = session.createTextMessage(line);
m.setStringProperty("sender", userId);
producer.send(m);
}
// Close connection and session
public void closeSession() throws JMSException {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
// Returns a thread object to handle receiving messages
private Thread getThread(MessageConsumer consumer) {
Thread receivingThread = new Thread(() -> {
try {
// continously listen for messages
while (!Thread.currentThread().isInterrupted()) {
Message msg = consumer.receive(300);
if (msg instanceof TextMessage tm) {
// Status handling for online/offline
if(tm.getText().isBlank() || tm.getText().isEmpty()) {
setStatus(1);
continue;
}
// Exit condition because correspondent went offline
if(tm.getText().equalsIgnoreCase("exit")) {
setStatus(0);
continue;
}
// Display received message
String sender = tm.getStringProperty("sender");
String body = tm.getText();
System.out.println("[" + sender + "]: " + body);
}
}
} catch (javax.jms.JMSException e) {
// Silently handle interruption - normal shutdown behavior
if (!Thread.currentThread().isInterrupted()) {
System.err.println("Error receiving message: " + e.getMessage());
}
}
});
receivingThread.start();
// we are returning this so we can interrupt it later
return receivingThread;
}
// Prompt for user ID
public void promptForUser(Scanner sc){
System.out.print("Enter your user ID: ");
setUserId(prompt(sc));
// Instructions
System.out.print("Welcome "+getUserId());
System.out.println(" -> Type 'exit' to quit.");
}
// Prompt for correspondent ID
public void promptForCorespondent(Scanner sc){
System.out.print("Who do you want to chat with? ");
setCorespondent(prompt(sc));
System.out.println("You can chat with: " + getCorespondent());
}
// Generic prompt method prompts for whatever we need
private String prompt(Scanner sc) {
String user = "";
while (user.isEmpty()) {
try {
user = sc.next();
} catch (Exception e) {
System.err.println("Error reading correspondent ID: " + e.getMessage());
return null;
}
}
return user;
}
// Setters and getters
// user
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserId() {
return userId;
}
// correspondent
public String getCorespondent() {
return corespondent;
}
public void setCorespondent(String corespondent) {
this.corespondent = corespondent;
}
// update status of correspondent and print status message
private void setStatus(int status) {
// just incase of duplicate status messages, if status is same, return
if(this.status == status) {
return;
}
this.status = status;
if(status == 0) {
System.out.println(getCorespondent()+" is now offline.");
} else {
System.out.println(getCorespondent()+" is now online.");
}
}
}

View File

@@ -0,0 +1,37 @@
package org.distributed;
/**
* Main.java
* by Tinsae Ghilay
* Entry point for the chat application.
* done for Distributed Systems Course - Week 2 Task 2
* Date: June 2024
*/
import java.util.Scanner;
public class Main {
// entry point
public static void main(String[] args) throws Exception {
// create a client
ChatHandler handler = new ChatHandler();
// init connection and session session
handler.initializeSession();
// create scanner for user input
Scanner sc = new Scanner(System.in);
// log in user
handler.promptForUser(sc);
// specify correspondent
handler.promptForCorespondent(sc);
// start conversation
handler.startConversation(sc);
// close session after all receivers are processed
handler.closeSession();
}
}