view agent/ipc/windows-named-pipes/server/src/main/java/com/redhat/thermostat/agent/ipc/winpipes/server/internal/WritePipeImpl.java @ 2589:a6ba41a449c8

[PATCH] Windows Named Pipes - preliminary implementation A preliminary implementation of windows named pipes. Does not set windoes named pipes as default IPC; remains TCP reviewed-by: aazores, ebaron review-thread: http://icedtea.classpath.org/pipermail/thermostat/2017-February/022126.html
author Simon Tooke <stooke@redhat.com>
date Fri, 10 Feb 2017 15:52:51 -0500
parents
children 2885a4a290d0
line wrap: on
line source

/*
 * Copyright 2012-2017 Red Hat, Inc.
 *
 * This file is part of Thermostat.
 *
 * Thermostat is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published
 * by the Free Software Foundation; either version 2, or (at your
 * option) any later version.
 *
 * Thermostat is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Thermostat; see the file COPYING.  If not see
 * <http://www.gnu.org/licenses/>.
 *
 * Linking this code with other modules is making a combined work
 * based on this code.  Thus, the terms and conditions of the GNU
 * General Public License cover the whole combination.
 *
 * As a special exception, the copyright holders of this code give
 * you permission to link this code with independent modules to
 * produce an executable, regardless of the license terms of these
 * independent modules, and to copy and distribute the resulting
 * executable under terms of your choice, provided that you also
 * meet, for each linked independent module, the terms and conditions
 * of the license of that module.  An independent module is a module
 * which is not derived from or based on this code.  If you modify
 * this code, you may extend this exception to your version of the
 * library, but you are not obligated to do so.  If you do not wish
 * to do so, delete this exception statement from your version.
 */

package com.redhat.thermostat.agent.ipc.winpipes.server.internal;

import com.redhat.thermostat.agent.ipc.winpipes.common.internal.WinPipesNativeHelper;
import com.redhat.thermostat.common.utils.LoggingUtils;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.logging.Logger;

class WritePipeImpl implements WindowsEventSelector.EventHandler {

    private static final Logger logger = LoggingUtils.getLogger(WritePipeImpl.class);
    private static WinPipesNativeHelper helper = WinPipesNativeHelper.INSTANCE;

    enum WritePipeState { QUIET_STATE, WRITING_STATE, FLUSHING_WRITE, ERROR_STATE, CLOSED_STATE }

    private WritePipeState writeState;

    private final PipeManager manager;
    private final String pipeName;
    private final long pipeHandle;
    private final Queue<ByteBuffer> writeQueue;
    private final long writeEventHandle;
    private final ByteBuffer writeOverlap;
    private final ByteBuffer writeBuffer;

    WritePipeImpl(PipeManager manager, String pipeName, long pipeHandle, int bufsize) throws IOException {
        this.manager = manager;
        this.pipeName = pipeName;
        this.writeState = WritePipeState.QUIET_STATE;
        this.pipeHandle = pipeHandle;
        this.writeEventHandle = helper.createEvent();
        if (this.writeEventHandle == 0) {
            throw new IOException(this.pipeName + ": can't create a Windows event" + " err=" + helper.getLastError());
        }
        this.writeQueue = new ArrayDeque<>();

        this.writeOverlap = helper.createDirectOverlapStruct(writeEventHandle);
        this.writeBuffer = helper.createDirectBuffer(bufsize);
    }

    public String toString() {
        return "WritePipeImpl(h=" + pipeHandle + " '" + pipeName + "' " + writeState + " q=" + writeQueue.size() + ")";
    }

    @Override
    public long getHandle() {
        return writeEventHandle;
    }

    @Override
    public void processEvent() throws IOException {
        if (handlePendingWrite()) {
            enqueueNextOperation();
        }
    }

    public int write(ByteBuffer src) throws IOException {

        // this call adds buffers to a FIFO queue.
        // if no writes are in progress, this call will kickstart a write on thr front of the queue
        logger.finest("write() - entered " + this + " bytes=" + src.remaining());
        synchronized (writeQueue) {
            writeQueue.add(src);
            logger.finest("write() - adding to writeQueue (new size= " + writeQueue.size() + ") bytes=" + src.remaining());
        }
        if (writeState == WritePipeState.QUIET_STATE) {
            helper.setEvent(writeEventHandle);
        }
        logger.finest("write() - exited " + src.remaining() + " " + this);
        return src.remaining();
    }

    public void close() throws IOException {
        writeState = WritePipeState.CLOSED_STATE;
        helper.cancelAllIo(pipeHandle, writeOverlap);
        helper.freeDirectBuffer(writeOverlap);
        helper.freeDirectBuffer(writeBuffer);
        helper.closeHandle(writeEventHandle);
    }

    private void enqueueNextOperation() throws IOException {
        if (writeState == WritePipeState.QUIET_STATE)
            enqueueWrite();
    }

    /**
     * handlePendingIO - if there is pending I/O then retrieve and process the result
     * It's possible the even was raised with no pending IO
     *
     * @return true if queueNextOperation() should be called, false otherwise
     * @throws IOException if there were any errors interacting with the pipe
     */
    private boolean handlePendingWrite() throws IOException {
        logger.finest("handlePendingWrite() - entered " + this);
        if (writeState != WritePipeState.QUIET_STATE) {
            logger.finest("handlePendingWrite() waiting for overlapped result on " + this + " state=" + writeState);
            final int bytesWritten = helper.getOverlappedResult(pipeHandle, writeOverlap, false);
            final int err = helper.getLastError();
            logger.finest("handlePendingWrite() got overlapped result (bytes=" + bytesWritten + " on " + this + " err=" + err);

            switch (writeState) {
                case WRITING_STATE:
                case FLUSHING_WRITE:
                    if (bytesWritten != writeBuffer.remaining()) {
                        writeState = WritePipeState.ERROR_STATE;
                        manager.resetPipe();
                        return false;
                    }
                    writeBuffer.position(0);
                    writeBuffer.limit(0);
                    break;
                default:
                    throw new IOException("Invalid pipe state " + writeState);
            }
        }
        this.writeState = WritePipeState.QUIET_STATE;
        logger.finest("handlePendingWrite() - exited " + this);
        return true;
    }


    /**
     * enqueue a write, if there's any unwritten data in the writebuffer, or in the write queue
     * @return true if an operation was enqueued
     * @throws IOException if an IO error occurred
     */
    private boolean enqueueWrite() throws IOException {
        logger.finest("enqueueWrite() - entered " + this);
        if (writeBuffer.remaining() == 0 && writeQueue.isEmpty()) {
            if (writeState == WritePipeState.FLUSHING_WRITE) {
                // all the data that's ever going to be on that queue has been fully written
                writeState = WritePipeState.QUIET_STATE;
                logger.finest("enqueueWrite() - exited true " + this);
                return true;
            }
            logger.finest("enqueueWrite() nothing to write - sleeping for a bit");
            helper.resetEvent(writeEventHandle);
            logger.finest("enqueueWrite() - exited false " + this);
            return false;
        }
        synchronized (this.writeQueue) {
            if (writeBuffer.remaining() == 0) {
                writeBuffer.limit(writeBuffer.capacity());
                writeBuffer.put(writeQueue.remove());
                logger.finest("enqueueWrite() - grabbing next buffer from queue (new queue size=" + writeQueue.size() + ")");
                writeBuffer.flip();
            }
        }
        logger.finest("enqueueWrite() - start overlapped writing " + this + " bytes=" + writeBuffer.remaining());
        final boolean ret = helper.writeFileOverlapped(pipeHandle, writeOverlap, writeBuffer);
        writeState = WritePipeState.WRITING_STATE;
        final int err = ret ? 0 : helper.getLastError();
        logger.finest("enqueueWrite() - finished overlapped writing " + this + " bytes written=" + ret + " err=" + err);
        if (!ret && err != 0 && err != 997) {
            writeState = WritePipeState.ERROR_STATE;
            manager.resetPipe();
        }

        logger.finest("enqueueWrite() - exited " + this);
        return writeState != WritePipeState.QUIET_STATE;
    }
}