From c00e7cae31907b3e303aa5d094c4a8298c6f296e Mon Sep 17 00:00:00 2001 From: Tinsae Date: Fri, 12 Dec 2025 00:39:08 +0100 Subject: [PATCH] final fix --- .vscode/settings.json | 3 + README.md | 11 + .../java/org/distributed/ChatHandler.java | 193 +++++++++----- .../java/org/distributed/ChatHandler2.java | 242 ------------------ .../java/org/distributed/ChatHandler.java | 14 +- 5 files changed, 149 insertions(+), 314 deletions(-) create mode 100644 .vscode/settings.json delete mode 100644 week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7b016a8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/README.md b/README.md index e69de29..48bd92d 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,11 @@ +## Task 2 +program starts, asks user id, and corespondent id. + +#### commands +- `exit` program exits + +## Task 3 +Task 3 follows the same proccess as Task 2. but implements additional commands to block and unblock correspondents +- `exit` program exits +- `-b ` block a user +- `-u ` unblocks user diff --git a/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java b/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java index 00157bd..c394ada 100644 --- a/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java +++ b/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java @@ -1,52 +1,44 @@ package org.distributed; -/** - * ChatHandler.java - * by Tinsae Ghilay - * Handles chat session, message sending and receiving using JMS queues. - * Uses ActiveMQ as the message broker. - * done for Distributed Systems Course - Week 2 Task 2 - * Date: June 2024 - */ - import java.util.Scanner; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; -// ChatHandler class to manage chat sessions public class ChatHandler { - // chatting ids private String userId, corespondent; private Connection connection; - private Session session; + private Session senderSession, receiverSession; private final String TAG = "Task2_queue_"; private final String BROKER_URL = "tcp://localhost:61616"; - - // Initialize connection and session + /** + * Initialize JMS session + * @throws JMSException + */ public void initializeSession() throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); this.connection = factory.createConnection(); this.connection.start(); - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - // Start continuous conversation with multiple receivers + /** + * Start conversation loop + * @param sc + * @throws JMSException + */ public void startConversation(Scanner sc) throws JMSException { - // Initialize all producers and consumers - Thread t = setupReceiver(); + // init receiving thread + Thread t = setupReceiver(userId); // Continuous input loop try { - while (sc.hasNextLine()) { - String line = sc.nextLine(); + while (sc.hasNextLine()){ - // Send message to receiver - sendMessage(createProducer(), line); - - // Exit condition - if (line.equalsIgnoreCase("exit")) { - System.out.println("Exiting conversation..."); + // handle user input and send messages + // or if exit command, break loop + if(!respond(sc)){ break; } } @@ -57,71 +49,115 @@ public class ChatHandler { } } + /** + * Handle user input and send messages + * @param sc + * @throws JMSException + */ + private boolean respond(Scanner sc) throws JMSException { + String line = sc.nextLine(); + + // exit condition + if (line.equalsIgnoreCase("exit")) { + System.out.println("Exiting conversation..."); + return false; + } + + // Send message to receiver + sendMessage(createProducer(corespondent), line); + return true; + } + + // Setup producer and consumer for a receiver - private Thread setupReceiver() throws JMSException { + private Thread setupReceiver(String receiver) throws JMSException { + MessageConsumer contact = createConsumer(); - MessageConsumer consumer = createConsumer(); - - return getThread(consumer); + // add consumer to contacts map + return getThread(contact); } - // Create producer queue for sending messages - private MessageProducer createProducer() throws JMSException { - String from = TAG + userId; - Queue prod = session.createQueue(from); - return session.createProducer(prod); + /** + * Create a producer for a given contact ID so we can send messages + * @param id + * @return MessageProducer + * @throws JMSException + */ + private MessageProducer createProducer(String id) throws JMSException { + String from = TAG + id; + Queue prod = senderSession.createQueue(from); + return senderSession.createProducer(prod); } - // Create consumer queue for receiving messages + + /** + * Subscribe to topics meant for this user + * @return MessageConsumer + * @throws JMSException + */ private MessageConsumer createConsumer() throws JMSException { - String to = TAG+ getCorespondent(); - Queue cons = session.createQueue(to); - return session.createConsumer(cons); + String to = TAG + userId; + Queue cons = receiverSession.createQueue(to); + return receiverSession.createConsumer(cons); } - // Send a message to the producer + /** + * Send a messag + * @param producer + * @param line + * @throws JMSException + */ private void sendMessage(MessageProducer producer, String line) throws JMSException { - TextMessage m = session.createTextMessage(line); + + TextMessage m = senderSession.createTextMessage(line); + // add sender identity to message m.setStringProperty("sender", userId); producer.send(m); } - // Close connection and session + /** + * Close session and connection + * @throws JMSException + */ public void closeSession() throws JMSException { - if (session != null) { - session.close(); + if (senderSession != null) { + senderSession.close(); + } + if( receiverSession != null) { + receiverSession.close(); } if (connection != null) { connection.close(); } } - // Returns a thread object to handle receiving messages + /** + * Returns a thread object to handle receiving messages + * @param consumer + * @return Thread + */ private Thread getThread(MessageConsumer consumer) { + Thread receivingThread = new Thread(() -> { try { - // continously listen for messages + while (!Thread.currentThread().isInterrupted()) { - Message msg = consumer.receive(300); + + // receive message with timeout + Message msg = consumer.receive(100); // 100ms timeout if (msg instanceof TextMessage tm) { + // message contents String sender = tm.getStringProperty("sender"); - // Status handling for online/offline - if(tm.getText().isBlank() || tm.getText().isEmpty()) { - System.out.println("> " + sender + " is online."); - continue; - } - - // Exit condition because correspondent went offline - if(tm.getText().equalsIgnoreCase("exit")) { - System.out.println("> " + sender + " is offline."); - continue; - } - - // Display received message can be from anyone - //String sender = tm.getStringProperty("sender"); String body = tm.getText(); - System.out.println("[" + sender + "]: " + body); + + boolean isEmptyMessage = body.isBlank() || body.isEmpty(); + + // display only if not blocked and not empty + if (!isEmptyMessage) { + System.out.println("[" + sender + "]: " + body); + } + } } } catch (javax.jms.JMSException e) { @@ -131,30 +167,48 @@ public class ChatHandler { } } }); + + receivingThread.start(); // we are returning this so we can interrupt it later return receivingThread; } - // Prompt for user ID + + /** + * Prompt for user ID + * @param sc + */ public void promptForUser(Scanner sc){ System.out.print("Enter your user ID: "); setUserId(prompt(sc)); // Instructions - System.out.print("Welcome "+getUserId()); - System.out.println(" -> Type 'exit' to quit."); + System.out.println("Welcome "+getUserId()); + printCommands(); } - // Prompt for correspondent ID + private void printCommands() { + System.out.println("- To exit the chat: exit"); + } + + /** + * Prompt for correspondent ID + * @param sc + */ public void promptForCorespondent(Scanner sc){ System.out.print("Who do you want to chat with? "); - setCorespondent(prompt(sc)); - System.out.println("You can chat with: " + getCorespondent()); + String contact = prompt(sc); + setCorespondent(contact); + System.out.println("You can start with: " + getCorespondent()); } - // Generic prompt method prompts for whatever we need + /** + * Generic prompt method prompts for whatever we need + * @param sc Scanner object + * @return String + */ private String prompt(Scanner sc) { String user = ""; @@ -189,5 +243,4 @@ public class ChatHandler { public void setCorespondent(String corespondent) { this.corespondent = corespondent; } - } diff --git a/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java b/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java deleted file mode 100644 index aca9b7a..0000000 --- a/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java +++ /dev/null @@ -1,242 +0,0 @@ -package org.distributed; -import java.util.Scanner; -import javax.jms.*; -import org.apache.activemq.ActiveMQConnectionFactory; - -public class ChatHandler2 { - - private String userId, corespondent; - private Connection connection; - private Session senderSession, receiverSession; - private final String TAG = "Task2_queue_"; - private final String BROKER_URL = "tcp://localhost:61616"; - /** - * Initialize JMS session - * @throws JMSException - */ - public void initializeSession() throws JMSException { - ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); - this.connection = factory.createConnection(); - this.connection.start(); - this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - /** - * Start conversation loop - * @param sc - * @throws JMSException - */ - public void startConversation(Scanner sc) throws JMSException { - - // init receiving thread - Thread t = setupReceiver(userId); - - // Continuous input loop - try { - while (sc.hasNextLine()){ - - // handle user input and send messages - // or if exit command, break loop - if(!respond(sc)){ - break; - } - } - } catch (Exception e) { - System.err.println("Error in continuous conversation: " + e.getMessage()); - } finally { - t.interrupt(); - } - } - - /** - * Handle user input and send messages - * @param sc - * @throws JMSException - */ - private boolean respond(Scanner sc) throws JMSException { - String line = sc.nextLine(); - - // exit condition - if (line.equalsIgnoreCase("exit")) { - System.out.println("Exiting conversation..."); - return false; - } - - // Send message to receiver - sendMessage(createProducer(corespondent), line); - return true; - } - - - // Setup producer and consumer for a receiver - private Thread setupReceiver(String receiver) throws JMSException { - MessageConsumer contact = createConsumer(); - - // add consumer to contacts map - return getThread(contact); - } - - /** - * Create a producer for a given contact ID so we can send messages - * @param id - * @return MessageProducer - * @throws JMSException - */ - private MessageProducer createProducer(String id) throws JMSException { - String from = TAG + id; - Queue prod = senderSession.createQueue(from); - return senderSession.createProducer(prod); - } - - - /** - * Subscribe to topics meant for this user - * @return MessageConsumer - * @throws JMSException - */ - private MessageConsumer createConsumer() throws JMSException { - String to = TAG + userId; - Queue cons = receiverSession.createQueue(to); - return receiverSession.createConsumer(cons); - } - - /** - * Send a messag - * @param producer - * @param line - * @throws JMSException - */ - private void sendMessage(MessageProducer producer, String line) throws JMSException { - - TextMessage m = senderSession.createTextMessage(line); - // add sender identity to message - m.setStringProperty("sender", userId); - producer.send(m); - } - - /** - * Close session and connection - * @throws JMSException - */ - public void closeSession() throws JMSException { - if (senderSession != null) { - senderSession.close(); - } - if( receiverSession != null) { - receiverSession.close(); - } - if (connection != null) { - connection.close(); - } - } - - /** - * Returns a thread object to handle receiving messages - * @param consumer - * @return Thread - */ - private Thread getThread(MessageConsumer consumer) { - - Thread receivingThread = new Thread(() -> { - try { - - while (!Thread.currentThread().isInterrupted()) { - - // receive message with timeout - Message msg = consumer.receive(100); // 100ms timeout - if (msg instanceof TextMessage tm) { - - // message contents - String sender = tm.getStringProperty("sender"); - String body = tm.getText(); - - boolean isEmptyMessage = body.isBlank() || body.isEmpty(); - - // display only if not blocked and not empty - if (!isEmptyMessage) { - System.out.println("[" + sender + "]: " + body); - } - - } - } - } catch (javax.jms.JMSException e) { - // Silently handle interruption - normal shutdown behavior - if (!Thread.currentThread().isInterrupted()) { - System.err.println("Error receiving message: " + e.getMessage()); - } - } - }); - - - receivingThread.start(); - // we are returning this so we can interrupt it later - return receivingThread; - } - - - /** - * Prompt for user ID - * @param sc - */ - public void promptForUser(Scanner sc){ - - System.out.print("Enter your user ID: "); - setUserId(prompt(sc)); - // Instructions - System.out.print("Welcome "+getUserId()); - System.out.println(" -> Type 'exit' to quit."); - } - - /** - * Prompt for correspondent ID - * @param sc - */ - public void promptForCorespondent(Scanner sc){ - - System.out.print("Who do you want to chat with? "); - String contact = prompt(sc); - setCorespondent(contact); - System.out.println("You can start with: " + getCorespondent()); - } - - /** - * Generic prompt method prompts for whatever we need - * @param sc Scanner object - * @return String - */ - private String prompt(Scanner sc) { - - String user = ""; - - while (user.isEmpty()) { - try { - user = sc.next(); - - } catch (Exception e) { - System.err.println("Error reading correspondent ID: " + e.getMessage()); - return null; - } - } - return user; - } - - // Setters and getters - // user - public void setUserId(String userId) { - this.userId = userId; - } - - public String getUserId() { - return userId; - } - - // correspondent - public String getCorespondent() { - return corespondent; - } - - public void setCorespondent(String corespondent) { - this.corespondent = corespondent; - } -} diff --git a/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java b/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java index 4d2638b..33c8555 100644 --- a/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java +++ b/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java @@ -292,9 +292,19 @@ public class ChatHandler { setUserId(prompt(sc)); // Instructions System.out.print("Welcome "+getUserId()); - System.out.println(" -> Type 'exit' to quit."); + printCommands(); } + /** + * Print available commands + */ + private void printCommands() { + System.out.println("\nYou can use the following commands during the chat:"); + System.out.println("- To block a contact: -b "); + System.out.println("- To unblock a contact: -u "); + System.out.println("- To exit the chat: exit"); + } + /** * Prompt for correspondent ID * @param sc @@ -309,7 +319,7 @@ public class ChatHandler { System.out.println(contact + " is currently blocked. Unblock to chat."); return; } - System.out.println("You can start with: " + getCorespondent()); + System.out.println("You can start chatting with: " + getCorespondent()); } /**