low level programmer

Main | Next page »
星期日 八月 29, 2010

拿來測試 UDP 的 UI

description

此 UI 可以拿來體會 UDP 封包會丟棄的現象(一次送上千個封包就會有感覺).
UDPSenderThread 可以指定要送到哪個 port,
在輸入框輸入要發幾個 packet 的數字後按 Enter 就可以發 UDP 封包到指定的 port.
UDPReceiverThread 可以指定要聽哪個 port,
收到 packet 之後就會顯示目前收到第幾個 UDP 封包與內容.
按 Reset 可以重新計算收到幾個封包

reference

Java Network Programming, 3e

codes

public class TestMain {

    private static final int MAX_PACKET_SIZE = 8192;

    public static void main(String[] args) {
        TestMain test = new TestMain();
        test.test();
    }

    private void test() {
        new UDPReceiverThread(1234).start();
        new UDPSenderThread(1234).start();
    }

    class UDPReceiverThread extends Thread {
        private int listenport;
        public UDPReceiverThread(int listenport) {
            this.listenport = listenport;
        }
        @Override
        public void run() {
            try {
                final AtomicInteger receivedPackets = new AtomicInteger();
                final JFrame f = new JFrame("Receiver : listen to port " + listenport);
                f.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
                f.getContentPane().setLayout(new BorderLayout());
                final JPanel p = new JPanel(new BorderLayout());
                f.getContentPane().add( p, BorderLayout.CENTER );
                final JTextArea status = new JTextArea(30, 30);
                p.add(new JScrollPane(status), BorderLayout.CENTER);
                JButton resetBtn = new JButton("Reset");
                resetBtn.addActionListener(new ActionListener(){
                    public void actionPerformed(ActionEvent e) {
                        status.setText("");
                        receivedPackets.set(0);
                    }
                });
                p.add(resetBtn, BorderLayout.NORTH);
                f.pack();
                f.setVisible(true);
                byte[] data = new byte[MAX_PACKET_SIZE];
                final DatagramSocket receiver = new DatagramSocket(listenport);
                DatagramPacket packet = new DatagramPacket(data, MAX_PACKET_SIZE);
                f.addWindowListener(new WindowAdapter() {
                    @Override
                    public void windowClosing(WindowEvent e) {
                        receiver.close();
                    }
                });
                while (true) {
                    receiver.receive(packet);
                    status.insert("Receive the " + receivedPackets.incrementAndGet() + " packet :" + new String(packet.getData(), 0, packet.getLength()) + " \n", 0);
                    packet.setLength( MAX_PACKET_SIZE );
                }
            } catch (Exception ex) {
                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    class UDPSenderThread extends Thread {
        private int targetport;
        public UDPSenderThread(int targetport) {
            this.targetport = targetport;
        }
        @Override
        public void run() {
            try {
                final DatagramSocket sender = new DatagramSocket();
                final JFrame f = new JFrame("sender: send to port " + targetport);
                f.getContentPane().setLayout(new BorderLayout());
                f.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
                JPanel p = new JPanel(new BorderLayout());
                final JFormattedTextField field = new JFormattedTextField(NumberFormat.getNumberInstance());
                field.setColumns(10);
                final JTextArea status = new JTextArea(30, 30);
                field.addActionListener(new ActionListener() {
                    public void actionPerformed(ActionEvent e) {
                        int packets = ((Number) field.getValue()).intValue();
                        if ( packets < 0 ) {
                            JOptionPane.showMessageDialog(f, "The input number must be positive!");
                            return;
                        }
                        try {
                            status.setText("");
                            InetAddress receiverHost = InetAddress.getByName("localhost");
                            DatagramPacket packet = new DatagramPacket(new byte[0], 0, receiverHost, targetport);
                            for ( int i = 0; i < packets; i++ ) {
                                byte[] data = String.valueOf(i+1).getBytes();
                                packet.setData( data );
                                packet.setLength( data.length );
                                status.append("Send the " + (i+1) + " packet \n");
                                sender.send(packet);
                            }
                        } catch (Exception ex) {
                            Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
                            JOptionPane.showMessageDialog(f, ex.getMessage());
                        }
                    }
                });
                f.getContentPane().add(p, BorderLayout.CENTER);
                p.add( field, BorderLayout.NORTH );
                p.add( new JScrollPane(status), BorderLayout.CENTER );
                f.addWindowListener(new WindowAdapter() {
                    @Override
                    public void windowClosing(WindowEvent e) {
                        System.out.println("Close sender..");
                        sender.close();
                    }
                });
                f.pack();
                f.setVisible(true);
            } catch (Exception ex) {
                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}
      

傳送與接收 UDP 封包

description

應用的時候要記得寄出的封包不一定全都會送到,
接收的時候也不一定能收到全部寄出的封包.

reference

Java Network Programming, 3e

codes

public class TestMain {

    private static final int MAX_PACKET_SIZE = 8192;

    public static void main(String[] args) {
        TestMain test = new TestMain();
        test.test();
    }

    private void test() {
        new UDPReceiverThread().start();
        new UDPSenderThread().start();
    }

    class UDPReceiverThread extends Thread {
        @Override
        public void run() {
            try {
                DatagramSocket receiver = new DatagramSocket(1234);
                DatagramPacket packet = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE);
                receiver.receive(packet);
                System.out.println("Receive data : " + new String(packet.getData(), 0, packet.getLength()));
                receiver.close();
            } catch (Exception ex) {
                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    class UDPSenderThread extends Thread {
        @Override
        public void run() {
            try {
                DatagramSocket sender = new DatagramSocket();
                byte[] data = "Hello".getBytes();
                sender.send(new DatagramPacket(data, data.length, InetAddress.getByName("localhost"), 1234));
                sender.close();
            } catch (Exception ex) {
                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}
      

星期二 八月 10, 2010

JMS Consumer and Producer, with swing

description

做了兩個 Thread: ProducerThread 與 ConsumerThread.
ProducerThread 會開一個 JFrame 裡面放 JTextField, 按下 Enter 會發 TextMessage.
ConsumerThread 會開一個 JFrame 裡面放 JTextArea, 接收到 Message 的時候會顯示在 JTextArea 上.
寫完之後發現可以建好幾個 producers 與 consumers 來測試, 感覺還不錯.

reference

Java EE 5
ActiveMQ

codes

public class TestMain {

    public static final String QUEUE_NAME = "TestQueue";
    private ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");

    public static void main(String[] args) {
        try {
            TestMain test = new TestMain();
            test.test();
        } catch (Exception ex) {
            Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void test() throws Exception {
        try {
            new ProducerThread().start();
            new ProducerThread().start();
            new ProducerThread().start();
            new ConsumerThread().start();
            new ConsumerThread().start();
        } catch (Exception ex) {
            Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void startAndCloseResourcesFinally(String title, JComponent c, final Session session, final Connection conn) {
        try {
            JFrame f = new JFrame(title);
            conn.start();
            f.addWindowStateListener(new WindowAdapter() {

                @Override
                public void windowClosed(WindowEvent e) {
                    try {
                        session.close();
                        conn.close();
                    } catch (Exception ex) {
                        Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            });
            f.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
            f.getContentPane().setLayout(new BorderLayout());
            f.getContentPane().add(c, BorderLayout.CENTER);
            f.pack();
            f.setVisible(true);
        } catch (JMSException ex) {
            Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private class ConsumerThread extends Thread {

        @Override
        public void run() {
            try {
                Connection conn = factory.createConnection();
                Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination dest = session.createQueue(QUEUE_NAME);
                MessageConsumer consumer = session.createConsumer(dest);
                final JTextArea t = new JTextArea(15, 40);
                consumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message msg) {
                        try {
                            if (TextMessage.class.isInstance(msg)) {
                                TextMessage txtMsg = (TextMessage) msg;
                                t.insert(txtMsg.getText() + "\n", 0);
                            }
                        } catch (Exception ex) {
                            Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
                        }
                    }
                });
                startAndCloseResourcesFinally("Consumer", new JScrollPane(t), session, conn);
            } catch (Exception ex) {
                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    private class ProducerThread extends Thread {

        @Override
        public void run() {
            try {
                Connection conn = factory.createConnection();
                final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination dest = session.createQueue(QUEUE_NAME);
                final MessageProducer producer = session.createProducer(dest);
                final JTextField t = new JTextField(20);
                JPanel p = new JPanel(new FlowLayout());
                p.add(t);
                t.addKeyListener(new KeyAdapter() {

                    @Override
                    public void keyPressed(KeyEvent e) {
                        if (e.getKeyCode() == KeyEvent.VK_ENTER) {
                            try {
                                producer.send(session.createTextMessage(t.getText()));
                            } catch (JMSException ex) {
                                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
                            } finally {
                                t.setText("");
                            }
                        }
                    }
                });
                startAndCloseResourcesFinally("Producer", p, session, conn);
            } catch (Exception ex) {
                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}
        

星期一 八月 09, 2010

JMS 的 Hello World

description

第一次看 standalone 的 JMS client 怎麼寫, 紀錄一下.
這是用 ActiveMQ, 雖然只是短短的程式, 但感覺花了很多時間啊...Orz
感謝 qrtt1 給予入門的資源與講解~

reference

qrtt1
How do I create new destinations
JavaEE5 API
The Java Message Service API

codes

private void test() throws Exception {
    ConnectionFactory f = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
    Connection conn = f.createConnection();
    Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
    Topic dest = session.createTopic("TestTopic"); // create topic
    MessageProducer producer = session.createProducer(dest);
    MessageConsumer consumer = session.createConsumer(dest);
    final CountDownLatch l = new CountDownLatch(1);
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message msg) {
            try {
                if ( TextMessage.class.isInstance(msg) ) {
                    TextMessage textMsg = (TextMessage) msg;
                    System.out.println(textMsg.getText());
                } else {
                    System.out.println(msg);
                }
            } catch (Exception ex) {
                Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
            } finally {
                l.countDown();
            }
        }
    });
    producer.send(session.createTextMessage("Hello World"));
    conn.start();
    session.commit();
    l.await(1, TimeUnit.MINUTES);
    session.close();
    conn.stop();
    conn.close();
}
        

星期三 八月 04, 2010

nio selector

nio selector

reference

Java NIO

description

nioSelector
  1. Selector 有個 factory method : Selector.open()
  2. SelectableChannel 必須處在 nonblocking mode 才可註冊到 Selector 中, 所以要呼叫 configureBlocking(false) 之後才能呼叫 register, 而且在變成 nonblocking mode 之後就不能變回 blocking mode
  3. 一個 Selector 可以註冊多個 SelectableChannel, 在註冊的時候會傳入 SelectionKey.OP_READ 或 SelectionKey.OP_WRITE 或 SelectionKey.OP_CONNECT 或 SelectionKey.OP_ACCEPT 或這幾個數字的組合來表示對哪些動作有興趣.
    不過要注意只有 SelectableChannel.validOps() 有包含的動作才能註冊, 否則就會有 exception.
    註冊之後會由 Selector 負責判斷註冊的哪些有興趣的動作目前已經 ready 並且操作 channel 裡的狀態
  4. 每個 Selector 與 SelectableChannel 的配對會產生一個新的 SelectionKey, 同一組配對如果呼叫 SelectableChannel 多次並不會產生多個 SelectionKey instance, 而只會改變同一個 SelectionKey instance 的 interestOps
  5. SelectionKey 會紀錄 Selector 與 SelectableChannel 的配段關係.
    ServerSocketChannel selectableChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = selectableChannel.socket();
    serverSocket.bind( new InetSocketAddress(1234) );
    selectableChannel.configureBlocking(false);
    Selector testSelector = Selector.open();
    boolean validOpsContainsAccept = (selectableChannel.validOps() & SelectionKey.OP_ACCEPT) != 0;
    if ( validOpsContainsAccept ) {
        SelectionKey createdKey = selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
        // 這裡可以顯示 SelectionKey 保存了 SelectableChannel 與 Selector 之間的關係
        System.out.println( createdKey.channel() == selectableChannel ); // true
        System.out.println( createdKey.selector() == testSelector ); // true
    }
                  
  6. 當 SelectionKey 呼叫 cancel 的時候, Selector 與 SelectableChannel 之間的關係並不會立刻解除,
    而是會先記錄在 Selector 裡面說有些 SelectionKey 已經 cancel 了, 直到下次 Selector.select() 的時候才會將已 cancel 的 SelectionKey 所記錄的 Selector 與 SelectableChannel 之間的關係解除, 之後再 register 的話就會建立新的 SelectionKey, 而不是使用已經 cancel 的那個
    如果在 SelectionKey cancel 之後並在 Selector.select() 之前再度呼叫 SelectableChannel.register 就會丟 exception.
    ServerSocketChannel selectableChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = selectableChannel.socket();
    serverSocket.bind( new InetSocketAddress(1234) );
    selectableChannel.configureBlocking(false);
    Selector testSelector = Selector.open();
    selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
    testSelector.select();
    Set<SelectionKey> selectedKeys = testSelector.selectedKeys();
    for ( Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
        SelectionKey key = i.next();
        key.cancel();
        // 如果打算在這重新註冊一次就會有 exception
        selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT); // throws exception
    }
                  
  7. SelectionKey cancel 之後不會立刻讓 Selector 與 SelectableChannel deregistered, 但會立刻從 valid 變 invalid
    ServerSocketChannel selectableChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = selectableChannel.socket();
    serverSocket.bind( new InetSocketAddress(1234) );
    selectableChannel.configureBlocking(false);
    Selector testSelector = Selector.open();
    selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
    testSelector.select();
    Set<SelectionKey> selectedKeys = testSelector.selectedKeys();
    for ( Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
        SelectionKey key = i.next();
        System.out.println( key.isValid() ); //true
        key.cancel();
        System.out.println( key.isValid() ); //false
    }
                  
  8. 把 SelectableChannel 註冊到 Selector 的時候要指定感興趣的動作, 感興趣的動作可以改變, 可透過重新註冊一次(會回傳同樣的 SelectionKey), 或透過呼叫 SelectionKey.interestOps( ops:int ) 來改變.
    和 SelectionKey.cancel() 要等 Selector.select() 之後才會真的讓 Selector 與 SelectableChannel deregistered 一樣, SelectionKey.interestOps() 的改變也要等下一次 Selector.select() 之後才會發生作用.
  9. Selector 有 selectedKeys(), 可以取得(至少有一個 ready operation)的 keys, 每個 key 則有 readyOps() 裡面存放所有有 ready operation 可執行的 channels. 因此 selectedKeys 與 ready set, 是不一樣的東西.
  10. Selector 內部有三組 key set, 註冊的 key, selectedKey, 與被 cancel 的 key.
    1. 當 Selector.select() 被呼叫的時候, 會從註冊的 key 與 selectedKeys 中把 cancel 過的 key 移除, 之後再從 cancel key set 把所有的 key 移除.
    2. 在 Selector.select() 之後 selectedKey 會選到至少有一個 ready operation 放在 selected key 的 ready operations 中. 在下次 Selector.select() 之前, 對於 selected key 的 ready operations 不會產生效果.
    3. 在 Selector.select() 之後, 如果 SelectionKey 還不在 selected keys set 中, 該 SelectionKey 的 ready operations 會被清空然後換上新偵測到的 ready operation
    4. 在 Selector.select() 之後, 如果 SelectionKey 本來就存在於 selected key set 中, 則該 SelectionKey 的 ready operations 會被更新, 原本就有的 operation 會再加上新的 operation.
      而 selected key set 中的 ready operations 都不會被清空, 會一直保留著.
    5. Selector.select() 回傳的數字是在 select 之後有新加 ready operation 到 SelectionKey 中的 channel 數量.
      如果在 Selector.select() 之前就存在於 selected key set 中, Selector.select() 也沒有新的 ready operation, 該 channel 就不會被算進 Selector.select() 回傳的數字中.
  11. selected key 中的 ready operation 是指既 ready 又包含在 interest operation 中的才算數. 假設 SocketChannel 可以有 CONNECT 與 OP_WRITE 兩種可能的 operation, 當 OP_WRITE 處在 ready 狀態時, 必須要在 Selector.select() 之前就表明對 OP_WRITE 有興趣, 這個 operation 才會被選入 selected key 中
  12. 當 operation 變成 ready 狀態時, key 中的 ready set 會被清空, 然後把該 operation 放進 key 的 ready set, 再把 key 移到 Selector 的 selected key set 中. 而移到 selected key set 的 key 是不會再被清空的, 所以處理完該 key 的 ready operation 要自己從 selected key set 中移除.

星期四 七月 29, 2010

nio 使用 selector 時, read 之後要判斷 EOF

description

看書的時候一直忽略一個範例上的小地方, 導致我寫得練習程式在 client socket close 之後, selector 就會一直選到 read 的 SelectionKey. 後來仔細看才發現原來有很簡單的地方沒注意.

codes

以下程式可用 telnet localhost 1234 之後打幾個字再把 console 關掉來測試效果
  1. 下面這段程式很平常. readable 的時候就去 read, 看起來沒錯.
    問題出在把 console 關掉之後就會發現 selector.selectedKeys() 一直會選到一個 read 的 SelectionKey.
    ByteBuffer buffer = ByteBuffer.allocate(2);
    ServerSocketChannel selectableChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = selectableChannel.socket();
    serverSocket.bind( new InetSocketAddress(1234) );
    selectableChannel.configureBlocking(false);
    Selector testSelector = Selector.open();
    selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
    while (true) {
        System.out.println("selected key number : " + testSelector.select());
        for ( Iterator<SelectionKey> i = testSelector.selectedKeys().iterator(); i.hasNext(); ) {
            SelectionKey key = i.next();
            if ( key.isAcceptable() ) {
                System.out.println("accept");
                ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = serverChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(testSelector, SelectionKey.OP_READ);
            }
            if ( key.isReadable() ) {
                System.out.println("read");
                SocketChannel socketChannel = (SocketChannel) key.channel();
                buffer.clear();
                socketChannel.read(buffer);
                buffer.flip();
                System.out.println(Charset.defaultCharset().decode(buffer));
            }
            i.remove();
        }
    }
                    
    後來就看一下到底是為什麼, 結果很單純就只是沒判斷 socketChannel.read 回傳的數字是多少而已.
  2. 後來改成這樣就好了
    ByteBuffer buffer = ByteBuffer.allocate(2);
    ServerSocketChannel selectableChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = selectableChannel.socket();
    serverSocket.bind( new InetSocketAddress(1234) );
    selectableChannel.configureBlocking(false);
    Selector testSelector = Selector.open();
    selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
    while (true) {
        System.out.println("selected key number : " + testSelector.select());
        for ( Iterator<SelectionKey> i = testSelector.selectedKeys().iterator(); i.hasNext(); ) {
            SelectionKey key = i.next();
            if ( key.isAcceptable() ) {
                System.out.println("accept");
                ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = serverChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(testSelector, SelectionKey.OP_READ);
            }
            if ( key.isReadable() ) {
                System.out.println("read");
                SocketChannel socketChannel = (SocketChannel) key.channel();
                buffer.clear();
                if (socketChannel.read(buffer) != -1) {
                    buffer.flip();
                    System.out.println(Charset.defaultCharset().decode(buffer));
                } else {
                    System.out.println("client socket is closed");
                    socketChannel.close();
                }
            }
            i.remove();
        }
    }
                    
    重點就是這段
    if (socketChannel.read(buffer) != -1) {
        buffer.flip();
        System.out.println(Charset.defaultCharset().decode(buffer));
    } else {
        System.out.println("client socket is closed");
        socketChannel.close();
    }
                    
    判斷 -1 就是 client close 了, 很單純, 但其實疑惑了我好一陣子.
    只能說眼殘, 還是提醒自己一下...

星期四 六月 24, 2010

載入大檔不 OutOfMemory - nio MappedByteBuffer

description

感覺 MappedByteBuffer 可以假裝成整個檔案都讀入 ByteBuffer 中,
實際上卻只有一小部分在記憶體裡, 所以不用擔心讀大檔而 OutOfMemory 的問題.

reference

Thinking in Java 4/e

codes

public class TestNIO {

    public static void main(String[] params) {
        TestNIO test = new TestNIO();
        try {
            test.test();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private int kb = 1024;
    private int mb = 1024 * kb;
    private int filesize = 10 * mb;
    
    private void test() throws Exception {
        System.out.println( "new way:" + testMappedByteBuffer() ); // spned 2938 millis to write 10 MB
        System.out.println( "old way1:" + testRandomAccessFile() ); // 5984 millis to write 10 MB
        System.out.println( "old way2:" + testFileOutputStream() ); // 8875 millis to write 10 MB
    }
    
    private long testFileOutputStream() throws Exception {
        long start = System.currentTimeMillis();
        FileOutputStream fout = new FileOutputStream("C:/testOutputStream.txt");
        long writesize = 0;
        byte[] toWrite = null;
        while ( (writesize += (toWrite = (UUID.randomUUID().toString() + "\r\n").getBytes()).length) < filesize ) {
            fout.write( toWrite );
        }
        fout.close();
        System.out.println("finish");
        return System.currentTimeMillis() - start;
    }
    
    private long testRandomAccessFile() throws Exception {
        long start = System.currentTimeMillis();
        RandomAccessFile fout = new RandomAccessFile("C:/test.txt", "rw");
        long writesize = 0;
        byte[] toWrite = null;
        while ( (writesize += (toWrite = (UUID.randomUUID().toString() + "\r\n").getBytes()).length) < filesize ) {
            fout.write( toWrite );
        }
        fout.close();
        System.out.println("finish");
        return System.currentTimeMillis() - start;
    }
    
    private long testMappedByteBuffer() throws Exception {
        long start = System.currentTimeMillis();
        RandomAccessFile file = new RandomAccessFile("C:/test.txt", "rw");
        FileChannel fc = file.getChannel();
        MappedByteBuffer buffer = fc.map(MapMode.READ_WRITE, 0, filesize);
        byte[] toWrite = null;
        while (buffer.remaining() > (toWrite = (UUID.randomUUID().toString() + "\r\n").getBytes()).length) {
            buffer.put( toWrite );
        }
        fc.close();
        file.close();
        System.out.println("finish");
        return System.currentTimeMillis() - start;
    }
    
}

星期二 六月 22, 2010

nio 在檔案讀寫文字 header

description

在存檔讀檔的時候, 想把一些 header 寫在檔案裡面, 換掉原來的檔名. 之後還要能把 header 讀出來並使檔案還原. 或許還可以加上一些加密的動作..

reference

Thinking in Java 4/e

codes

public class TestNIO {

    private static final int HEADER_SIZE = 1000;
    
    public static void main(String[] params) {
        TestNIO test = new TestNIO();
        try {
            test.test();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void test() throws Exception {
        prepareImageFile();
        InputStream imageInputStream = readImageInputStreamFromSomewhere();
        writeImageWithOwnerInfo( imageInputStream );
        decodeImage();
    }

    private void decodeImage() throws Exception {
        FileChannel fc = new FileInputStream("C:/test.ejpg").getChannel();
        ByteBuffer ownerInfo = ByteBuffer.allocate( HEADER_SIZE );
        fc.read( ownerInfo );
        // rewind to print header
        ownerInfo.rewind();
        System.out.println( "owner:" + Charset.forName("UTF-8").decode( ownerInfo ) );
        FileChannel fout = new FileOutputStream("C:/testdecode.jpg").getChannel();
        fc.transferTo(HEADER_SIZE, fc.size()-HEADER_SIZE, fout);
        // close resources
        fc.close();
        fout.close();
    }
    
    private void writeImageWithOwnerInfo(InputStream in) throws Exception {
        FileChannel fc = new FileOutputStream("C:/test.ejpg").getChannel();
        // Can write anything in the header
        ByteBuffer ownerInfo = ByteBuffer.allocate( HEADER_SIZE );
        ownerInfo.put( "中文名".getBytes("UTF-8") );
        // rewind to write in FileChannel
        ownerInfo.rewind();
        fc.write(ownerInfo);
        
        // Channels can convert InputStream into ReadableByteChannel
        ReadableByteChannel channel = Channels.newChannel(in);
        // Write stream after header
        fc.transferFrom( channel, HEADER_SIZE, in.available());
        // close resources
        channel.close();
        fc.close();
    }
    
    private InputStream readImageInputStreamFromSomewhere() throws Exception {
        return new FileInputStream("C:/test.jpg");
    }
    
    private void prepareImageFile() throws Exception {
        BufferedImage bimg = new BufferedImage( 100, 100, BufferedImage.TYPE_INT_ARGB);
        Graphics2D g2 = bimg.createGraphics();
        g2.setColor( Color.YELLOW );
        g2.drawString("測試中文", 10, 10);
        g2.dispose();
        ImageIO.write(bimg, "jpg", new FileOutputStream("C:/test.jpg"));        
    }
    
}

星期五 六月 18, 2010

simple JAXB example

this is a very simple example for recall

1. prepare class Person

    @XmlRootElement
    public class Person {

        private String name = "";
        
        // getter and setter 
        
    }

2. use JAXBContext

	private void test() throws JAXBException {
		Person inputP = new Person();
		inputP.setName("TEST");
        
        
		JAXBContext c = JAXBContext.newInstance(Person.class);
        // will print <?xml version="1.0" encoding="UTF-8" standalone="yes"?><person><name>TEST</name></person>
		c.createMarshaller().marshal(inputP, System.out); 
		
		
		Person outputP = (Person) c.createUnmarshaller().unmarshal( IOUtils.toInputStream( "<person><name>TEST</name></person>" ) );
		System.out.println(outputP.getName()); // print TEST
	}

星期一 六月 14, 2010

用 FileChannel 分割與合併檔案

description

看 java doc, 用 transferFrom/transferTo 效能好像比 loop 呼叫 FileChannel.read/FileChannel.write 讀寫檔案好.

reference

Thinking in Java 4/e

codes

public class TestNIO {

    public static void main(String[] params) {
        TestNIO test = new TestNIO();
        try {
            test.test();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void test() throws Exception {
        separateToTmpFiles( 10 );
        combineSeparatedTmpFiles( 10 );
    }
    
    private void combineSeparatedTmpFiles(int tmpFileCount) throws Exception {
        FileChannel fout = new FileOutputStream("D:/testnio/target.rmvb").getChannel();
        for ( int i = 0; i <= tmpFileCount; i++ ) {
            FileChannel fin = new FileInputStream( tmpFilePath(i) ).getChannel();
            fout.transferFrom(fin, fout.size(), fin.size());
            fin.close();
        }
        fout.close();
    }
    
    private void separateToTmpFiles(int tmpFileCount) throws Exception {
        FileChannel fin = new FileInputStream("D:/testnio/source.rmvb").getChannel();
        long sourceFileSize = fin.size();
        long perTmpFileSize = (long) (sourceFileSize / tmpFileCount);
        long transferredSize = 0;
        for ( int i = 0; i <= tmpFileCount; i++ ) {
            FileChannel fout = new FileOutputStream(tmpFilePath(i)).getChannel();
            long notTransferredSize = sourceFileSize - perTmpFileSize;
            boolean finalLoop = notTransferredSize < perTmpFileSize;
            if ( finalLoop ) {
                fin.transferTo(transferredSize, notTransferredSize, fout);
            } else {
                fin.transferTo(transferredSize, perTmpFileSize, fout);
                transferredSize += perTmpFileSize;
            }
            fout.close();
        }
        fin.close();
    }
    
    private String tmpFilePath(int index) {
        return "D:/testnio/testfile.tmp" + index;
    }
}

星期日 六月 13, 2010

nio simple echo server

description

原本用 telnet 測試, 有一些奇怪的現象,
比方說 ServerSocketChannel 會一直取得 write key,
或是只要打一個字就會直接進入 write key 的區塊, 直到寫一個 client 才比較正常.
感覺還不太熟..

reference

Java NetWork

codes

public class TestNIO {

    public static void main(String[] params) {
        TestNIO test = new TestNIO();
        try {
            test.test();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void test() throws Exception {
        new EchoServerThread().start();
        TimeUnit.SECONDS.sleep(1);
        for ( int i = 0; i < 100; i++ ) {
            new EchoClientThread(i).start();            
        }
    }
    
    private static class EchoClientThread extends Thread {
        
        public EchoClientThread(int index) {
            super("client-" + index);
        }
        
        @Override
        public void run() {
            try {
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                final Socket socket = new Socket("localhost", 1234);
                new Thread() {
                    public void run() {
                        try {
                            InputStream in = socket.getInputStream();
                            int readCount = 0;
                            byte[] data = new byte[1024];
                            while ( (readCount = in.read(data)) != -1 ) {
                                System.out.println( getName() + " get response : " + new String(data, 0, readCount) );
                            }
                            in.close();
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }.start();
                PrintStream out = new PrintStream(socket.getOutputStream());
                out.println("ABCDEFGHIJKLMNOPQRSTUVWXYZ\n");
                out.flush();
                countDownLatch.await();
                System.out.println(getName() + " close connection");
                out.close();
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            } 
            
        }
    }
    
    private static class EchoServerThread extends Thread {
        
        public EchoServerThread() {
            super("[server]");
        }
        
        @Override
        public void run() {
            try {
                int closeCounts = 0;
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                ServerSocket serverSocket = serverSocketChannel.socket();
                serverSocket.bind( new InetSocketAddress(1234) );
                Selector selector = Selector.open();
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                while (true) {
                    System.out.println( "rest keys " + selector.select() );
                    
                    Set<SelectionKey> keys = selector.selectedKeys();
                    for ( Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
                        SelectionKey key = i.next();
                        i.remove();
                        if ( key.isAcceptable() ) {
                            ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                            SocketChannel channel = serverChannel.accept();
                            channel.configureBlocking(false);
                            SelectionKey readKey = channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                            readKey.attach( ByteBuffer.allocate(10) );
                        }
                        if ( key.isReadable() ) {
                            SocketChannel channel = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            channel.read( buffer );
                        }
                        if ( key.isWritable() ) {
                            SocketChannel channel = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            buffer.flip();
                            channel.write( buffer );
                            buffer.compact();
                            if ( new String(buffer.array()).contains("\n") ) {
                                key.cancel();
                                key.channel().close();
                                System.out.println("close count = " + closeCounts++);
                            }
                        }
                    }
                }
                
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        
    }
    
}

星期日 五月 30, 2010

nio simple time server

description

client 可以發 request 到 server, server 會回應目前的時間字串.
有 java.io 與 java.nio 兩種版本.
因為這個 time server 是直接回應訊息給 client, 感覺 nio 的效果不大.

reference

Java Network

codes

public class TestNIO {

    public static void main(String[] params) {
        TestNIO test = new TestNIO();
        test.test();
    }
    
    private void test() {
        long start = System.currentTimeMillis();
        new NIOTimeServer(1234).start();
//      new OldTimeServer(1234).start();
        int clientCount = 10000;
        CountDownLatch latch = new CountDownLatch(clientCount);
        for ( int i = 0; i < clientCount; i++ ) {
            new TimeClient(i, "localhost", 1234, latch).start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long stop = System.currentTimeMillis();
        System.out.println( "spent " + ((stop - start) / 1000) + " seconds." );
    }
    
    private static class TimeClient extends Thread {
        
        private final CountDownLatch latch;
        private final String timeServerHost;
        private final int timeServerPort;
        
        public TimeClient(int clientNumber, String timeServerHost, int timeServerPort, CountDownLatch latch) {
            super( "[client-" + clientNumber + "]" );
            this.timeServerHost = timeServerHost;
            this.timeServerPort = timeServerPort;
            this.latch = latch;
        }
        
        @Override
        public void run() {
            Socket socket = null;
            BufferedReader reader = null;
            try {
                socket = new Socket(timeServerHost, timeServerPort);
                reader = new BufferedReader( new InputStreamReader(socket.getInputStream()) );
                println( reader.readLine() );
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                try {
                    println( "close" );
                    if ( socket != null ) {
                        socket.close();
                    }
                    if ( reader != null ) {
                        reader.close();
                    }
                    latch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private void println(String msg) {
            System.out.println( getName() + ":" + msg );
        }
        
    }
    
    
    private static class OldTimeServer extends Thread {
        
        private final int port;
        
        public OldTimeServer(int port) {
            super("[server-" + port + "]");
            this.port = port;
        }
        
        @Override
        public void run() {
            ServerSocket server = null;
            
            try {
                server = new ServerSocket(port);
                while (true) {
                    Socket socket = server.accept();
                    println( "connection established " + socket );
                    OutputStream out = new BufferedOutputStream(socket.getOutputStream());
                    out.write( (new Date().toString() + "\r\n").getBytes() );
                    out.flush();
                    out.close();                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if ( server != null ) {
                        server.close();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }       
        
        private void println(String msg) {
            System.out.println( getName() + ":" + msg );
        }
        
    }
    
    private static class NIOTimeServer extends Thread {
        
        private final int port;
        
        public NIOTimeServer(int port) {
            super("[server-" + port + "]");
            this.port = port;
        }
        
        @Override
        public void run() {
            try {
                ServerSocketChannel serverChannel = ServerSocketChannel.open();
                serverChannel.configureBlocking(false);
                ServerSocket serverSocket = serverChannel.socket();
                serverSocket.bind( new InetSocketAddress(port) );
                Selector selector = Selector.open();
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                while (true) {
                    selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    for ( Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
                        SelectionKey key = i.next();
                        try {
                            if ( key.isAcceptable() ) {
                                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                                SocketChannel channel = server.accept();
                                channel.configureBlocking(false);
                                println("connection established " + channel);
                                SelectionKey writeKey = channel.register(selector, SelectionKey.OP_WRITE);
                                ByteBuffer buffer = ByteBuffer.wrap( (String.valueOf(new Date()) + "\r\n").getBytes() );
                                writeKey.attach( buffer );
                            } else if ( key.isWritable() ) {
                                SocketChannel channel = (SocketChannel) key.channel();
                                ByteBuffer buffer = (ByteBuffer) key.attachment();
                                channel.write( buffer );
                                channel.close();
                            }
                            i.remove();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                            key.cancel();
                        }
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            
        }
        
        private void println(String msg) {
            System.out.println( getName() + ":" + msg );
        }
        
    }
    
}

星期二 五月 11, 2010

使能先作別的事情, 等需要時再取計算結果: FutureTask

FutureTask

description

先決定要做什麼事情, 交給 FutureTask 放在 Executor 去做之後就(new thread)就先做自己的事情,
做完等需要的時候再回 main thread 取 FutureTask 處理事情的結果.
呼叫 FutureTask.get 的時候如果事情還沒處理完就會 block 住

reference

Future
FutureTask

codes

public class TestFuture {

	public static void main(String[] args) {
		TestFuture test = new TestFuture();
		test.test();
	}

	private void test() {
		FutureTask task = new FutureTask(new Callable() {
			@Override
			public Result call() throws Exception {
				System.out.println("call..");
				TimeUnit.SECONDS.sleep(3);
				Result result = new Result();
				result.setName( String.valueOf(System.currentTimeMillis()) );
				System.out.println("call finished");
				return result;
			}
		});
		ExecutorService service = Executors.newFixedThreadPool(1);
		service.execute(task);
		try {
			// do something
			String prefix = "test-";
			TimeUnit.SECONDS.sleep(2);
			// finally we need Future result
			System.out.println(prefix + task.get().getName());
		} catch (Exception ex) {
			ex.printStackTrace();
		}
		System.out.println("main finished");
		service.shutdown();
	}

	private static class Result {

		private String name = "";

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}
	}

}
		

星期一 五月 10, 2010

等所有 thread 事情做完再繼續: CountDownLatch

CountDownLatch

description

就是"等"所有或指定的 thread 把事情都做完再繼續接下來的工作

reference

CountDownLatch

codes

public class TestCountDownLatch {

    public static void main(String[] args) {
        TestCountDownLatch test = new TestCountDownLatch();
        test.test();
    }

    private void test() {
        CountDownLatch latch = new CountDownLatch(5);
        new SleepThread("1", latch, 1000).start();
        new SleepThread("2", latch, 2000).start();
        new SleepThread("3", latch, 3000).start();
        new SleepThread("4", latch, 4000).start();
        new SleepThread("5", latch, 5000).start();
        try {
            latch.await();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        System.out.println("all waked up");
    }

    private static class SleepThread extends Thread {

        private long sleepTime = 0;
        private final CountDownLatch latch;

        public SleepThread(String name, CountDownLatch latch, long sleepTime) {
            super(name);
            this.latch = latch;
            this.sleepTime = sleepTime;
        }

        @Override
        public void run() {
            try {
                System.out.println(getName() + " sleep...");
                Thread.sleep(sleepTime);
                System.out.println(getName() + " awake");
                latch.countDown();
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }

    }

}
      

星期五 五月 07, 2010

非同步化交換兩個 thread 的物件: Exchanger

description

這個 class 的功能是交換 thread 的物件, 但遇到需求的使用目的就是要取得另一個 thread 的物件, 如果沒取得就 wait.

reference

java.util.concurrent.Exchanger

codes

public class TestExchanger {

    public static void main(String[] args) {
        TestExchanger test = new TestExchanger();
        test.test();
    }

    private void test() {
        final Exchanger<IntValue> exchanger = new Exchanger<IntValue>();
        new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println("exchange min");
                    IntValue val = exchanger.exchange(new IntValue(Integer.MIN_VALUE));
                    System.out.println(val.getVal() == Integer.MAX_VALUE);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println("exchange max");
                    IntValue val = exchanger.exchange(new IntValue(Integer.MAX_VALUE));
                    System.out.println(val.getVal() == Integer.MIN_VALUE);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }.start();

    }



    private static class IntValue {
        private final int val;
        public IntValue(int val) {
            this.val = val;
        }
        public int getVal() {
            return val;
        }
    }

}