view agent/ipc/windows-named-pipes/common/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/common/internal/MessageReader.java @ 2589:a6ba41a449c8

[PATCH] Windows Named Pipes - preliminary implementation A preliminary implementation of windows named pipes. Does not set windoes named pipes as default IPC; remains TCP reviewed-by: aazores, ebaron review-thread: http://icedtea.classpath.org/pipermail/thermostat/2017-February/022126.html
author Simon Tooke <stooke@redhat.com>
date Fri, 10 Feb 2017 15:52:51 -0500
parents
children
line wrap: on
line source

/*
 * Copyright 2012-2017 Red Hat, Inc.
 *
 * This file is part of Thermostat.
 *
 * Thermostat is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published
 * by the Free Software Foundation; either version 2, or (at your
 * option) any later version.
 *
 * Thermostat is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Thermostat; see the file COPYING.  If not see
 * <http://www.gnu.org/licenses/>.
 *
 * Linking this code with other modules is making a combined work
 * based on this code.  Thus, the terms and conditions of the GNU
 * General Public License cover the whole combination.
 *
 * As a special exception, the copyright holders of this code give
 * you permission to link this code with independent modules to
 * produce an executable, regardless of the license terms of these
 * independent modules, and to copy and distribute the resulting
 * executable under terms of your choice, provided that you also
 * meet, for each linked independent module, the terms and conditions
 * of the license of that module.  An independent module is a module
 * which is not derived from or based on this code.  If you modify
 * this code, you may extend this exception to your version of the
 * library, but you are not obligated to do so.  If you do not wish
 * to do so, delete this exception statement from your version.
 */

package com.redhat.thermostat.agent.ipc.winpipes.common.internal;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

abstract class MessageReader {
    
    // States used to track how much of a message (header) we've processed
    protected enum ReadState { NEW_MESSAGE, MIN_HEADER_READ, FULL_HEADER_READ, ERROR };
    // Number of bytes to read from the header to determine the total header size
    protected static final int MIN_HEADER_SIZE = MessageHeader.getMinimumHeaderSize();
    // If true, dumps header information for each header read/written
    private static final boolean DEBUG_HEADER = false;
    
    // Fixed size buffer used to read the initial part of a message header
    private final ByteBuffer minHeaderBuf;
    // Various message-related limits
    private final MessageLimits limits;
    
    // Current state of data that has been processed by this reader
    protected ReadState state;
    // Message header for the message currently being processed
    protected MessageHeader currentHeader;
    // Buffer used for the remainder of the message header, 
    // after the minimum has been read
    private ByteBuffer headerBuf;
    // Buffer used for the message payload of the message being processed currently
    private ByteBuffer messageBuf;
    // Messages that have finished being processed, but are part of a multi-part message
    // that has not been fully processed yet.
    private List<ByteBuffer> messages;
    
    protected MessageReader(MessageLimits limits) {
        this.state = ReadState.NEW_MESSAGE;
        this.currentHeader = null;
        this.minHeaderBuf = ByteBuffer.allocate(MIN_HEADER_SIZE);
        this.limits = limits;
        this.headerBuf = null;
        this.messageBuf = null;
        this.messages = new ArrayList<ByteBuffer>();
    }
    
    protected void processData(ByteBuffer readBuffer) throws IOException {
        try {
            while (readBuffer.hasRemaining()) {
                switch (state) {
                case NEW_MESSAGE:
                    // Append data to current buffer, if room
                    putMinRemaining(minHeaderBuf, readBuffer);

                    // Is there enough data to read the minimum header size?
                    if (minHeaderBuf.remaining() == 0) {
                        minHeaderBuf.position(0);
                        if (currentHeader != null || headerBuf != null) {
                            throw new IllegalStateException("Header already processed");
                        }
                        // Create Header
                        currentHeader = MessageHeader.fromByteBuffer(minHeaderBuf);
                        minHeaderBuf.clear();

                        // Check header size
                        // Set up reader to process remainder of header
                        int headerSize = currentHeader.getHeaderSize();
                        if (headerSize > limits.getMaxHeaderSize()) {
                            throw new IOException("Message header size larger than maximum of "
                                    + limits.getMaxHeaderSize() + " bytes");
                        }
                        int remainingHeader = headerSize - MIN_HEADER_SIZE;
                        headerBuf = ByteBuffer.allocate(remainingHeader);
                        state = ReadState.MIN_HEADER_READ;
                    }
                    break;
                case MIN_HEADER_READ:
                    if (currentHeader == null || headerBuf == null) {
                        throw new IllegalStateException("No header available");
                    } else if (messageBuf != null) {
                        throw new IllegalStateException("Message already processed");
                    }

                    // Append data to current buffer, if room
                    putMinRemaining(headerBuf, readBuffer);

                    // Have we read the full header?
                    if (headerBuf.remaining() == 0) {
                        headerBuf.position(0);

                        // Finish setting MessageHeader object fields
                        currentHeader.setRemainingFields(headerBuf);

                        // Set up reader to process message payload
                        int messageSize = currentHeader.getMessageSize();
                        if (messageSize > limits.getMaxMessagePartSize()) {
                            throw new IOException("Message part size larger than maximum of "
                                    + limits.getMaxMessagePartSize() + " bytes");
                        }
                        messageBuf = ByteBuffer.allocate(messageSize);
                        headerBuf = null;
                        state = ReadState.FULL_HEADER_READ;

                        // Dump header information if requested
                        if (DEBUG_HEADER) {
                            currentHeader.dumpHeader("[Read] ");
                        }
                    }
                    break;
                case FULL_HEADER_READ:
                    if (currentHeader == null || messageBuf == null) {
                        throw new IllegalStateException("Missing header or message");
                    }
                    // Append data to current buffer, if room
                    putMinRemaining(messageBuf, readBuffer);

                    // Have we read the full message?
                    if (messageBuf.remaining() == 0) {
                        messageBuf.position(0);

                        // Store this message until we received all parts
                        messages.add(messageBuf);
                        // Did we receive all parts of this message?
                        if (!currentHeader.isMoreData()) {
                            // Notify listener
                            ByteBuffer fullMessage = joinMessages();
                            readFullMessage(fullMessage);
                            // Start new list of message parts
                            messages = new ArrayList<ByteBuffer>();
                        }

                        // Reset reader state
                        messageBuf = null;
                        currentHeader = null;
                        state = ReadState.NEW_MESSAGE;
                    }
                    break;
                case ERROR:
                    throw new IOException("Reader state corrupted by previous fatal error"); 
                default:
                    throw new IllegalStateException("Unknown state: " + state.name());
                }
            }
        } catch (IOException e) {
            // Set to error state to stop this reader from processing more data
            this.state = ReadState.ERROR;
            throw e;
        } catch (IllegalStateException e) {
            // Set to error state to stop this reader from processing more data
            this.state = ReadState.ERROR;
            throw e;
        }
    }
    
    protected abstract void readFullMessage(ByteBuffer fullMessage);
    
    private ByteBuffer joinMessages() throws IOException {
        // Single part shortcut
        if (messages.size() == 1) {
            return messages.get(0);
        }
        
        int totalSize = 0;
        int maxMessageSize = limits.getMaxMessageSize();
        for (ByteBuffer buf : messages) {
            totalSize += buf.limit();
            // Check for overflow as well as size limit
            if (totalSize < 0 || totalSize > maxMessageSize) {
                throw new IOException("Total message size is larger than maximum of " + maxMessageSize + " bytes");
            }
        }
        
        ByteBuffer fullMessage = ByteBuffer.allocate(totalSize);
        for (ByteBuffer buf : messages) {
            fullMessage.put(buf);
        }
        fullMessage.flip();
        return fullMessage;
    }

    private void putMinRemaining(ByteBuffer dst, ByteBuffer src) {
        int minRemaining = Math.min(dst.remaining(), src.remaining());
        for (int i = 0; i < minRemaining; i++) {
            dst.put(src.get());
        }
    }
    
    // For testing purposes
    ReadState getState() {
        return state;
    }
    
    // For testing purposes
    MessageHeader getCurrentHeader() {
        return currentHeader;
    }

}