From 38e99a29e2d3bb318f7d5c3ad2c42d90442c8513 Mon Sep 17 00:00:00 2001 From: Tinsae Date: Fri, 12 Dec 2025 00:17:14 +0100 Subject: [PATCH] all may be done --- .../java/org/distributed/ChatHandler.java | 31 +-- .../java/org/distributed/ChatHandler2.java | 242 ++++++++++++++++++ .../java/org/distributed/ChatHandler.java | 146 ++++------- 3 files changed, 303 insertions(+), 116 deletions(-) create mode 100644 week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java diff --git a/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java b/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java index 7dda9fc..00157bd 100644 --- a/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java +++ b/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler.java @@ -19,11 +19,11 @@ public class ChatHandler { private String userId, corespondent; private Connection connection; private Session session; - int status = 0; // 0: offline, 1: online + private final String TAG = "Task2_queue_"; + private final String BROKER_URL = "tcp://localhost:61616"; // Initialize connection and session public void initializeSession() throws JMSException { - String BROKER_URL = "tcp://localhost:61616"; ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); this.connection = factory.createConnection(); this.connection.start(); @@ -67,14 +67,14 @@ public class ChatHandler { // Create producer queue for sending messages private MessageProducer createProducer() throws JMSException { - String from = "queue_" + userId; + String from = TAG + userId; Queue prod = session.createQueue(from); return session.createProducer(prod); } // Create consumer queue for receiving messages private MessageConsumer createConsumer() throws JMSException { - String to = "queue_" + getCorespondent(); + String to = TAG+ getCorespondent(); Queue cons = session.createQueue(to); return session.createConsumer(cons); } @@ -105,20 +105,21 @@ public class ChatHandler { Message msg = consumer.receive(300); if (msg instanceof TextMessage tm) { + String sender = tm.getStringProperty("sender"); // Status handling for online/offline if(tm.getText().isBlank() || tm.getText().isEmpty()) { - setStatus(1); + System.out.println("> " + sender + " is online."); continue; } // Exit condition because correspondent went offline if(tm.getText().equalsIgnoreCase("exit")) { - setStatus(0); + System.out.println("> " + sender + " is offline."); continue; } - // Display received message - String sender = tm.getStringProperty("sender"); + // Display received message can be from anyone + //String sender = tm.getStringProperty("sender"); String body = tm.getText(); System.out.println("[" + sender + "]: " + body); } @@ -189,18 +190,4 @@ public class ChatHandler { this.corespondent = corespondent; } - // update status of correspondent and print status message - private void setStatus(int status) { - - // just incase of duplicate status messages, if status is same, return - if(this.status == status) { - return; - } - this.status = status; - if(status == 0) { - System.out.println(getCorespondent()+" is now offline."); - } else { - System.out.println(getCorespondent()+" is now online."); - } - } } diff --git a/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java b/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java new file mode 100644 index 0000000..aca9b7a --- /dev/null +++ b/week2_TinsaeGhilay/Task2/src/main/java/org/distributed/ChatHandler2.java @@ -0,0 +1,242 @@ +package org.distributed; +import java.util.Scanner; +import javax.jms.*; +import org.apache.activemq.ActiveMQConnectionFactory; + +public class ChatHandler2 { + + private String userId, corespondent; + private Connection connection; + private Session senderSession, receiverSession; + private final String TAG = "Task2_queue_"; + private final String BROKER_URL = "tcp://localhost:61616"; + /** + * Initialize JMS session + * @throws JMSException + */ + public void initializeSession() throws JMSException { + ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); + this.connection = factory.createConnection(); + this.connection.start(); + this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + /** + * Start conversation loop + * @param sc + * @throws JMSException + */ + public void startConversation(Scanner sc) throws JMSException { + + // init receiving thread + Thread t = setupReceiver(userId); + + // Continuous input loop + try { + while (sc.hasNextLine()){ + + // handle user input and send messages + // or if exit command, break loop + if(!respond(sc)){ + break; + } + } + } catch (Exception e) { + System.err.println("Error in continuous conversation: " + e.getMessage()); + } finally { + t.interrupt(); + } + } + + /** + * Handle user input and send messages + * @param sc + * @throws JMSException + */ + private boolean respond(Scanner sc) throws JMSException { + String line = sc.nextLine(); + + // exit condition + if (line.equalsIgnoreCase("exit")) { + System.out.println("Exiting conversation..."); + return false; + } + + // Send message to receiver + sendMessage(createProducer(corespondent), line); + return true; + } + + + // Setup producer and consumer for a receiver + private Thread setupReceiver(String receiver) throws JMSException { + MessageConsumer contact = createConsumer(); + + // add consumer to contacts map + return getThread(contact); + } + + /** + * Create a producer for a given contact ID so we can send messages + * @param id + * @return MessageProducer + * @throws JMSException + */ + private MessageProducer createProducer(String id) throws JMSException { + String from = TAG + id; + Queue prod = senderSession.createQueue(from); + return senderSession.createProducer(prod); + } + + + /** + * Subscribe to topics meant for this user + * @return MessageConsumer + * @throws JMSException + */ + private MessageConsumer createConsumer() throws JMSException { + String to = TAG + userId; + Queue cons = receiverSession.createQueue(to); + return receiverSession.createConsumer(cons); + } + + /** + * Send a messag + * @param producer + * @param line + * @throws JMSException + */ + private void sendMessage(MessageProducer producer, String line) throws JMSException { + + TextMessage m = senderSession.createTextMessage(line); + // add sender identity to message + m.setStringProperty("sender", userId); + producer.send(m); + } + + /** + * Close session and connection + * @throws JMSException + */ + public void closeSession() throws JMSException { + if (senderSession != null) { + senderSession.close(); + } + if( receiverSession != null) { + receiverSession.close(); + } + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a thread object to handle receiving messages + * @param consumer + * @return Thread + */ + private Thread getThread(MessageConsumer consumer) { + + Thread receivingThread = new Thread(() -> { + try { + + while (!Thread.currentThread().isInterrupted()) { + + // receive message with timeout + Message msg = consumer.receive(100); // 100ms timeout + if (msg instanceof TextMessage tm) { + + // message contents + String sender = tm.getStringProperty("sender"); + String body = tm.getText(); + + boolean isEmptyMessage = body.isBlank() || body.isEmpty(); + + // display only if not blocked and not empty + if (!isEmptyMessage) { + System.out.println("[" + sender + "]: " + body); + } + + } + } + } catch (javax.jms.JMSException e) { + // Silently handle interruption - normal shutdown behavior + if (!Thread.currentThread().isInterrupted()) { + System.err.println("Error receiving message: " + e.getMessage()); + } + } + }); + + + receivingThread.start(); + // we are returning this so we can interrupt it later + return receivingThread; + } + + + /** + * Prompt for user ID + * @param sc + */ + public void promptForUser(Scanner sc){ + + System.out.print("Enter your user ID: "); + setUserId(prompt(sc)); + // Instructions + System.out.print("Welcome "+getUserId()); + System.out.println(" -> Type 'exit' to quit."); + } + + /** + * Prompt for correspondent ID + * @param sc + */ + public void promptForCorespondent(Scanner sc){ + + System.out.print("Who do you want to chat with? "); + String contact = prompt(sc); + setCorespondent(contact); + System.out.println("You can start with: " + getCorespondent()); + } + + /** + * Generic prompt method prompts for whatever we need + * @param sc Scanner object + * @return String + */ + private String prompt(Scanner sc) { + + String user = ""; + + while (user.isEmpty()) { + try { + user = sc.next(); + + } catch (Exception e) { + System.err.println("Error reading correspondent ID: " + e.getMessage()); + return null; + } + } + return user; + } + + // Setters and getters + // user + public void setUserId(String userId) { + this.userId = userId; + } + + public String getUserId() { + return userId; + } + + // correspondent + public String getCorespondent() { + return corespondent; + } + + public void setCorespondent(String corespondent) { + this.corespondent = corespondent; + } +} diff --git a/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java b/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java index 27565c0..4d2638b 100644 --- a/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java +++ b/week2_TinsaeGhilay/Task3/src/main/java/org/distributed/ChatHandler.java @@ -9,8 +9,7 @@ public class ChatHandler { private String userId, corespondent; private Connection connection; - private Session /*session,*/ senderSession, receiverSession; - private int status = 0; // 0: offline, 1: online + private Session senderSession, receiverSession; private final Map blocked = new ConcurrentHashMap<>(); private final String TAG = "Task3_queue_"; private final String BROKER_URL = "tcp://localhost:61616"; @@ -22,7 +21,6 @@ public class ChatHandler { ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); this.connection = factory.createConnection(); this.connection.start(); - //this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } @@ -65,17 +63,18 @@ public class ChatHandler { // commands to block or unblock contact if(line.startsWith("-")){ delegateBlockingManagement(sc,line); - //return true; + // handling edge case where instead of user, exit comand is given + return !corespondent.equalsIgnoreCase("exit"); } if (line.equalsIgnoreCase("exit")) { System.out.println("Exiting conversation..."); - //return false; + return false; } // Send message to receiver sendMessage(createProducer(corespondent), line); - return !line.equalsIgnoreCase("exit");//true; + return true; } @@ -88,8 +87,17 @@ public class ChatHandler { // block command is -b followed by contact id if(line.startsWith("-b")){ - blockContact(line); - promptForCorespondent(sc); + try { + boolean wasCorespondent = blockContact(line); + // if we blocked current correspondent, prompt for new one + if(wasCorespondent){ + System.out.println("You have blocked your current correspondent. Please choose a new one."); + promptForCorespondent(sc); + } + } catch (JMSException e) { + System.err.println(e.getMessage()); + } + return; } // unblock command is -u followed by contact id @@ -116,7 +124,6 @@ public class ChatHandler { MessageConsumer contact = createConsumer(); // add consumer to contacts map - //contacts.put(receiver, contact); return getThread(contact); } @@ -136,21 +143,32 @@ public class ChatHandler { * Block a contact * @param command */ - private void blockContact(String command) { + private boolean blockContact(String command) throws JMSException { // remove the command part to get the contact id String contact = command.replace("-b", "").trim(); - try { - MessageProducer ignored = createProducer(contact); - // remove from contacts map - // and close his ass - if(ignored != null){ - ignored.close(); - blocked.put(contact, ignored); - System.out.println(contact + " has been blocked."); - } - } catch (JMSException e) { - System.err.println("Error unblocking consumer: " + e.getMessage()); + if(contact.isEmpty()){ + throw new JMSException("No contact specified to block."); } + + boolean isCurrentCorespondent = contact.equals(getCorespondent()); + MessageProducer ignored = createProducer(contact); + + // check if already blocked + if(blocked.containsKey(contact)){ + System.out.println(contact + " is already blocked."); + return isCurrentCorespondent; + } + // remove from contacts map + // and close his ass + if(ignored != null){ + ignored.close(); + blocked.put(contact, ignored); + System.out.println(contact + " has been blocked."); + return isCurrentCorespondent; + }else{ + throw new JMSException("Could not block contact: " + contact); + } + } @@ -219,6 +237,7 @@ public class ChatHandler { /** * Returns a thread object to handle receiving messages + * checks if sender is blocked before displaying * @param consumer * @return Thread */ @@ -232,13 +251,18 @@ public class ChatHandler { // receive message with timeout Message msg = consumer.receive(100); // 100ms timeout if (msg instanceof TextMessage tm) { - // handle received message - String sender = tm.getStringProperty("sender"); - // display message if sender is not blocked - if (!isBlocked(sender)) { - String body = tm.getText(); - displayMessage(sender, body); + // message contents + String sender = tm.getStringProperty("sender"); + String body = tm.getText(); + + // display conditions + boolean isBlocked = blocked.containsKey(sender); + boolean isEmptyMessage = body.isBlank() || body.isEmpty(); + + // display only if not blocked and not empty + if (!isBlocked && !isEmptyMessage) { + System.out.println("[" + sender + "]: " + body); } } @@ -258,47 +282,6 @@ public class ChatHandler { } - /** - * Display received message - * Checks if sender is blocked - * checks if message is empty for status update - * checks for exit condition - * @param tm - * @throws JMSException - */ - - private void displayMessage(String sender, String body) { - - - // Exit condition because correspondent went offline - if(body.equalsIgnoreCase("exit")) { - setStatus(0); - return; - } - - // display only if not empty - if(!body.isBlank() || !body.isEmpty()) { - // Status handling for online/offline - System.out.println("[" + sender + "]: " + body); - } - setStatus(1); - - } - - /** - * Check if sender is blocked - * if sender is blocked, print info message - * @param sender - * @return - */ - private boolean isBlocked(String sender) { - boolean isBlocked = blocked.containsKey(sender); - if(isBlocked){ - System.out.println(" >> Message from '"+sender+"' ignored. because user is blocked."); - } - return isBlocked; - } - /** * Prompt for user ID * @param sc @@ -306,7 +289,6 @@ public class ChatHandler { public void promptForUser(Scanner sc){ System.out.print("Enter your user ID: "); - //userId = prompt(sc); setUserId(prompt(sc)); // Instructions System.out.print("Welcome "+getUserId()); @@ -327,6 +309,7 @@ public class ChatHandler { System.out.println(contact + " is currently blocked. Unblock to chat."); return; } + System.out.println("You can start with: " + getCorespondent()); } /** @@ -367,30 +350,5 @@ public class ChatHandler { public void setCorespondent(String corespondent) { this.corespondent = corespondent; - System.out.println("You can chat with: " + corespondent); - // reset old status when new corepondent is set - setStatus(0); - } - - /** - * Updates corespondents status - * @param status - */ - private void setStatus(int status) { - - // just in case of duplicate status messages, if status is same, return - if(this.status == status) { - return; - } - - // update status - this.status = status; - - // and print accordingly - if(status == 0) { - System.out.println(getCorespondent()+" is now offline."); - } else { - System.out.println(getCorespondent()+" is now online."); - } } }