final fix

This commit is contained in:
2025-12-12 00:39:08 +01:00
parent 38e99a29e2
commit c00e7cae31
5 changed files with 149 additions and 314 deletions

3
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,3 @@
{
"java.compile.nullAnalysis.mode": "automatic"
}

View File

@@ -0,0 +1,11 @@
## Task 2
program starts, asks user id, and corespondent id.
#### commands
- `exit` program exits
## Task 3
Task 3 follows the same proccess as Task 2. but implements additional commands to block and unblock correspondents
- `exit` program exits
- `-b <user>` block a user
- `-u <user>` unblocks user

View File

@@ -1,52 +1,44 @@
package org.distributed; package org.distributed;
/**
* ChatHandler.java
* by Tinsae Ghilay
* Handles chat session, message sending and receiving using JMS queues.
* Uses ActiveMQ as the message broker.
* done for Distributed Systems Course - Week 2 Task 2
* Date: June 2024
*/
import java.util.Scanner; import java.util.Scanner;
import javax.jms.*; import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
// ChatHandler class to manage chat sessions
public class ChatHandler { public class ChatHandler {
// chatting ids
private String userId, corespondent; private String userId, corespondent;
private Connection connection; private Connection connection;
private Session session; private Session senderSession, receiverSession;
private final String TAG = "Task2_queue_"; private final String TAG = "Task2_queue_";
private final String BROKER_URL = "tcp://localhost:61616"; private final String BROKER_URL = "tcp://localhost:61616";
/**
// Initialize connection and session * Initialize JMS session
* @throws JMSException
*/
public void initializeSession() throws JMSException { public void initializeSession() throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
this.connection = factory.createConnection(); this.connection = factory.createConnection();
this.connection.start(); this.connection.start();
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} }
// Start continuous conversation with multiple receivers /**
* Start conversation loop
* @param sc
* @throws JMSException
*/
public void startConversation(Scanner sc) throws JMSException { public void startConversation(Scanner sc) throws JMSException {
// Initialize all producers and consumers // init receiving thread
Thread t = setupReceiver(); Thread t = setupReceiver(userId);
// Continuous input loop // Continuous input loop
try { try {
while (sc.hasNextLine()) { while (sc.hasNextLine()){
String line = sc.nextLine();
// Send message to receiver // handle user input and send messages
sendMessage(createProducer(), line); // or if exit command, break loop
if(!respond(sc)){
// Exit condition
if (line.equalsIgnoreCase("exit")) {
System.out.println("Exiting conversation...");
break; break;
} }
} }
@@ -57,71 +49,115 @@ public class ChatHandler {
} }
} }
/**
* Handle user input and send messages
* @param sc
* @throws JMSException
*/
private boolean respond(Scanner sc) throws JMSException {
String line = sc.nextLine();
// exit condition
if (line.equalsIgnoreCase("exit")) {
System.out.println("Exiting conversation...");
return false;
}
// Send message to receiver
sendMessage(createProducer(corespondent), line);
return true;
}
// Setup producer and consumer for a receiver // Setup producer and consumer for a receiver
private Thread setupReceiver() throws JMSException { private Thread setupReceiver(String receiver) throws JMSException {
MessageConsumer contact = createConsumer();
MessageConsumer consumer = createConsumer(); // add consumer to contacts map
return getThread(contact);
return getThread(consumer);
} }
// Create producer queue for sending messages /**
private MessageProducer createProducer() throws JMSException { * Create a producer for a given contact ID so we can send messages
String from = TAG + userId; * @param id
Queue prod = session.createQueue(from); * @return MessageProducer
return session.createProducer(prod); * @throws JMSException
*/
private MessageProducer createProducer(String id) throws JMSException {
String from = TAG + id;
Queue prod = senderSession.createQueue(from);
return senderSession.createProducer(prod);
} }
// Create consumer queue for receiving messages
/**
* Subscribe to topics meant for this user
* @return MessageConsumer
* @throws JMSException
*/
private MessageConsumer createConsumer() throws JMSException { private MessageConsumer createConsumer() throws JMSException {
String to = TAG+ getCorespondent(); String to = TAG + userId;
Queue cons = session.createQueue(to); Queue cons = receiverSession.createQueue(to);
return session.createConsumer(cons); return receiverSession.createConsumer(cons);
} }
// Send a message to the producer /**
* Send a messag
* @param producer
* @param line
* @throws JMSException
*/
private void sendMessage(MessageProducer producer, String line) throws JMSException { private void sendMessage(MessageProducer producer, String line) throws JMSException {
TextMessage m = session.createTextMessage(line);
TextMessage m = senderSession.createTextMessage(line);
// add sender identity to message
m.setStringProperty("sender", userId); m.setStringProperty("sender", userId);
producer.send(m); producer.send(m);
} }
// Close connection and session /**
* Close session and connection
* @throws JMSException
*/
public void closeSession() throws JMSException { public void closeSession() throws JMSException {
if (session != null) { if (senderSession != null) {
session.close(); senderSession.close();
}
if( receiverSession != null) {
receiverSession.close();
} }
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
} }
} }
// Returns a thread object to handle receiving messages /**
* Returns a thread object to handle receiving messages
* @param consumer
* @return Thread
*/
private Thread getThread(MessageConsumer consumer) { private Thread getThread(MessageConsumer consumer) {
Thread receivingThread = new Thread(() -> { Thread receivingThread = new Thread(() -> {
try { try {
// continously listen for messages
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
Message msg = consumer.receive(300);
// receive message with timeout
Message msg = consumer.receive(100); // 100ms timeout
if (msg instanceof TextMessage tm) { if (msg instanceof TextMessage tm) {
// message contents
String sender = tm.getStringProperty("sender"); String sender = tm.getStringProperty("sender");
// Status handling for online/offline
if(tm.getText().isBlank() || tm.getText().isEmpty()) {
System.out.println("> " + sender + " is online.");
continue;
}
// Exit condition because correspondent went offline
if(tm.getText().equalsIgnoreCase("exit")) {
System.out.println("> " + sender + " is offline.");
continue;
}
// Display received message can be from anyone
//String sender = tm.getStringProperty("sender");
String body = tm.getText(); String body = tm.getText();
System.out.println("[" + sender + "]: " + body);
boolean isEmptyMessage = body.isBlank() || body.isEmpty();
// display only if not blocked and not empty
if (!isEmptyMessage) {
System.out.println("[" + sender + "]: " + body);
}
} }
} }
} catch (javax.jms.JMSException e) { } catch (javax.jms.JMSException e) {
@@ -131,30 +167,48 @@ public class ChatHandler {
} }
} }
}); });
receivingThread.start(); receivingThread.start();
// we are returning this so we can interrupt it later // we are returning this so we can interrupt it later
return receivingThread; return receivingThread;
} }
// Prompt for user ID
/**
* Prompt for user ID
* @param sc
*/
public void promptForUser(Scanner sc){ public void promptForUser(Scanner sc){
System.out.print("Enter your user ID: "); System.out.print("Enter your user ID: ");
setUserId(prompt(sc)); setUserId(prompt(sc));
// Instructions // Instructions
System.out.print("Welcome "+getUserId()); System.out.println("Welcome "+getUserId());
System.out.println(" -> Type 'exit' to quit."); printCommands();
} }
// Prompt for correspondent ID private void printCommands() {
System.out.println("- To exit the chat: exit");
}
/**
* Prompt for correspondent ID
* @param sc
*/
public void promptForCorespondent(Scanner sc){ public void promptForCorespondent(Scanner sc){
System.out.print("Who do you want to chat with? "); System.out.print("Who do you want to chat with? ");
setCorespondent(prompt(sc)); String contact = prompt(sc);
System.out.println("You can chat with: " + getCorespondent()); setCorespondent(contact);
System.out.println("You can start with: " + getCorespondent());
} }
// Generic prompt method prompts for whatever we need /**
* Generic prompt method prompts for whatever we need
* @param sc Scanner object
* @return String
*/
private String prompt(Scanner sc) { private String prompt(Scanner sc) {
String user = ""; String user = "";
@@ -189,5 +243,4 @@ public class ChatHandler {
public void setCorespondent(String corespondent) { public void setCorespondent(String corespondent) {
this.corespondent = corespondent; this.corespondent = corespondent;
} }
} }

View File

@@ -1,242 +0,0 @@
package org.distributed;
import java.util.Scanner;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ChatHandler2 {
private String userId, corespondent;
private Connection connection;
private Session senderSession, receiverSession;
private final String TAG = "Task2_queue_";
private final String BROKER_URL = "tcp://localhost:61616";
/**
* Initialize JMS session
* @throws JMSException
*/
public void initializeSession() throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
this.connection = factory.createConnection();
this.connection.start();
this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
/**
* Start conversation loop
* @param sc
* @throws JMSException
*/
public void startConversation(Scanner sc) throws JMSException {
// init receiving thread
Thread t = setupReceiver(userId);
// Continuous input loop
try {
while (sc.hasNextLine()){
// handle user input and send messages
// or if exit command, break loop
if(!respond(sc)){
break;
}
}
} catch (Exception e) {
System.err.println("Error in continuous conversation: " + e.getMessage());
} finally {
t.interrupt();
}
}
/**
* Handle user input and send messages
* @param sc
* @throws JMSException
*/
private boolean respond(Scanner sc) throws JMSException {
String line = sc.nextLine();
// exit condition
if (line.equalsIgnoreCase("exit")) {
System.out.println("Exiting conversation...");
return false;
}
// Send message to receiver
sendMessage(createProducer(corespondent), line);
return true;
}
// Setup producer and consumer for a receiver
private Thread setupReceiver(String receiver) throws JMSException {
MessageConsumer contact = createConsumer();
// add consumer to contacts map
return getThread(contact);
}
/**
* Create a producer for a given contact ID so we can send messages
* @param id
* @return MessageProducer
* @throws JMSException
*/
private MessageProducer createProducer(String id) throws JMSException {
String from = TAG + id;
Queue prod = senderSession.createQueue(from);
return senderSession.createProducer(prod);
}
/**
* Subscribe to topics meant for this user
* @return MessageConsumer
* @throws JMSException
*/
private MessageConsumer createConsumer() throws JMSException {
String to = TAG + userId;
Queue cons = receiverSession.createQueue(to);
return receiverSession.createConsumer(cons);
}
/**
* Send a messag
* @param producer
* @param line
* @throws JMSException
*/
private void sendMessage(MessageProducer producer, String line) throws JMSException {
TextMessage m = senderSession.createTextMessage(line);
// add sender identity to message
m.setStringProperty("sender", userId);
producer.send(m);
}
/**
* Close session and connection
* @throws JMSException
*/
public void closeSession() throws JMSException {
if (senderSession != null) {
senderSession.close();
}
if( receiverSession != null) {
receiverSession.close();
}
if (connection != null) {
connection.close();
}
}
/**
* Returns a thread object to handle receiving messages
* @param consumer
* @return Thread
*/
private Thread getThread(MessageConsumer consumer) {
Thread receivingThread = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
// receive message with timeout
Message msg = consumer.receive(100); // 100ms timeout
if (msg instanceof TextMessage tm) {
// message contents
String sender = tm.getStringProperty("sender");
String body = tm.getText();
boolean isEmptyMessage = body.isBlank() || body.isEmpty();
// display only if not blocked and not empty
if (!isEmptyMessage) {
System.out.println("[" + sender + "]: " + body);
}
}
}
} catch (javax.jms.JMSException e) {
// Silently handle interruption - normal shutdown behavior
if (!Thread.currentThread().isInterrupted()) {
System.err.println("Error receiving message: " + e.getMessage());
}
}
});
receivingThread.start();
// we are returning this so we can interrupt it later
return receivingThread;
}
/**
* Prompt for user ID
* @param sc
*/
public void promptForUser(Scanner sc){
System.out.print("Enter your user ID: ");
setUserId(prompt(sc));
// Instructions
System.out.print("Welcome "+getUserId());
System.out.println(" -> Type 'exit' to quit.");
}
/**
* Prompt for correspondent ID
* @param sc
*/
public void promptForCorespondent(Scanner sc){
System.out.print("Who do you want to chat with? ");
String contact = prompt(sc);
setCorespondent(contact);
System.out.println("You can start with: " + getCorespondent());
}
/**
* Generic prompt method prompts for whatever we need
* @param sc Scanner object
* @return String
*/
private String prompt(Scanner sc) {
String user = "";
while (user.isEmpty()) {
try {
user = sc.next();
} catch (Exception e) {
System.err.println("Error reading correspondent ID: " + e.getMessage());
return null;
}
}
return user;
}
// Setters and getters
// user
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserId() {
return userId;
}
// correspondent
public String getCorespondent() {
return corespondent;
}
public void setCorespondent(String corespondent) {
this.corespondent = corespondent;
}
}

View File

@@ -292,9 +292,19 @@ public class ChatHandler {
setUserId(prompt(sc)); setUserId(prompt(sc));
// Instructions // Instructions
System.out.print("Welcome "+getUserId()); System.out.print("Welcome "+getUserId());
System.out.println(" -> Type 'exit' to quit."); printCommands();
} }
/**
* Print available commands
*/
private void printCommands() {
System.out.println("\nYou can use the following commands during the chat:");
System.out.println("- To block a contact: -b <contact_id>");
System.out.println("- To unblock a contact: -u <contact_id>");
System.out.println("- To exit the chat: exit");
}
/** /**
* Prompt for correspondent ID * Prompt for correspondent ID
* @param sc * @param sc
@@ -309,7 +319,7 @@ public class ChatHandler {
System.out.println(contact + " is currently blocked. Unblock to chat."); System.out.println(contact + " is currently blocked. Unblock to chat.");
return; return;
} }
System.out.println("You can start with: " + getCorespondent()); System.out.println("You can start chatting with: " + getCorespondent());
} }
/** /**