欢迎访问移动开发之家(rcyd.net),关注移动开发教程。移动开发之家  移动开发问答|  每日更新
页面位置 : > > > 内容正文

文件队列 QueueFile.java,队列queuefile.java

来源: 开发者 投稿于  被查看 10207 次 评论:74

文件队列 QueueFile.java,队列queuefile.java


QueueFile.java 文件队列

1.QueueFile.java

    /**
     * Copyright (C) 2010 Square, Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.squareup.util;
     
    import com.squareup.Square;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.RandomAccessFile;
    import java.nio.channels.FileChannel;
    import java.util.NoSuchElementException;
     
    /**
     * A reliable, efficient, file-based, FIFO queue. Additions and removals are
     * O(1). All operations are atomic. Writes are synchronous; data will be
     * written to disk before an operation returns. The underlying file is
     * structured to survive process and even system crashes. If an I/O exception
     * is thrown during a mutating change, the change is aborted. It is safe to
     * continue to use a {@code QueueFile} instance after an exception.
     *
     * <p>All operations are synchronized. In a traditional queue, the remove
     * operation returns an element. In this queue, {@link #peek} and {@link
     * #remove} are used in conjunction. Use {@code peek} to retrieve the first
     * element, and then {@code remove} to remove it after successful processing.
     * If the system crashes after {@code peek} and during processing, the element
     * will remain in the queue, to be processed when the system restarts.
     *
     * <p><b><font color="red">NOTE:</font></b> The current implementation is
     * built for file systems that support atomic segment writes (like YAFFS).
     * Most conventional file systems don't support this; if the power goes out
     * while writing a segment, the segment will contain garbage and the file will
     * be corrupt. We'll add journaling support so this class can be used with
     * more file systems later.
     *
     * @author Bob Lee (bob@squareup.com)
     */
    public class QueueFile {
     
    /** Initial file size in bytes. */
    private static final int INITIAL_LENGTH = 4096; // one file system block
     
    /** Length of header in bytes. */
    static final int HEADER_LENGTH = 16;
     
    /**
      * The underlying file. Uses a ring buffer to store entries. Designed so
      * that a modification isn't committed or visible until we write the header.
      * The header is much smaller than a segment. So long as the underlying file
      * system supports atomic segment writes, changes to the queue are atomic.
      * Storing the file length ensures we can recover from a failed expansion
      * (i.e. if setting the file length succeeds but the process dies before the
      * data can be copied).
      *
      * <pre>
      * Format:
      * Header (16 bytes)
      * Element Ring Buffer (File Length - 16 bytes)
      *
      * Header:
      * File Length (4 bytes)
      * Element Count (4 bytes)
      * First Element Position (4 bytes, =0 if null)
      * Last Element Position (4 bytes, =0 if null)
      *
      * Element:
      * Length (4 bytes)
      * Data (Length bytes)
      * </pre>
      */
    private final RandomAccessFile raf;
     
    /** Cached file length. Always a power of 2. */
    int fileLength;
     
    /** Number of elements. */
    private int elementCount;
     
    /** Pointer to first (or eldest) element. */
    private Element first;
     
    /** Pointer to last (or newest) element. */
    private Element last;
     
    /** In-memory buffer. Big enough to hold the header. */
    private final byte[] buffer = new byte[16];
     
    /**
      * Constructs a new queue backed by the given file. Only one {@code QueueFile}
      * instance should access a given file at a time.
      */
    public QueueFile(File file) throws IOException {
    if (!file.exists()) initialize(file);
    raf = open(file);
    readHeader();
    }
     
    /** For testing. */
    QueueFile(RandomAccessFile raf) throws IOException {
    this.raf = raf;
    readHeader();
    }
     
    /**
      * Stores int in buffer. The behavior is equivalent to calling
      * {@link RandomAccessFile#writeInt}.
      */
    private static void writeInt(byte[] buffer, int offset, int value) {
    buffer[offset] = (byte) (value >> 24);
    buffer[offset + 1] = (byte) (value >> 16);
    buffer[offset + 2] = (byte) (value >> 8);
    buffer[offset + 3] = (byte) value;
    }
     
    /**
      * Stores int values in buffer. The behavior is equivalent to calling
      * {@link RandomAccessFile#writeInt} for each value.
      */
    private static void writeInts(byte[] buffer, int... values) {
    int offset = 0;
    for (int value : values) {
    writeInt(buffer, offset, value);
    offset += 4;
    }
    }
     
    /**
      * Reads an int from a byte[].
      */
    private static int readInt(byte[] buffer, int offset) {
    return ((buffer[offset] & 0xff) << 24)
    + ((buffer[offset + 1] & 0xff) << 16)
    + ((buffer[offset + 2] & 0xff) << 8)
    + (buffer[offset + 3] & 0xff);
    }
     
    /**
      * Reads the header.
      */
    private void readHeader() throws IOException {
    raf.seek(0);
    raf.readFully(buffer);
    fileLength = readInt(buffer, 0);
    elementCount = readInt(buffer, 4);
    int firstOffset = readInt(buffer, 8);
    int lastOffset = readInt(buffer, 12);
    first = readElement(firstOffset);
    last = readElement(lastOffset);
    }
     
    /**
      * Writes header atomically. The arguments contain the updated values. The
      * class member fields should not have changed yet. This only updates the
      * state in the file. It's up to the caller to update the class member
      * variables *after* this call succeeds. Assumes segment writes are atomic
      * in the underlying file system.
      */
    private void writeHeader(int fileLength, int elementCount, int firstPosition,
    int lastPosition) throws IOException {
    writeInts(buffer, fileLength, elementCount, firstPosition, lastPosition);
    raf.seek(0);
    raf.write(buffer);
    }
     
    /**
      * Returns the Element for the given offset.
      */
    private Element readElement(int position) throws IOException {
    if (position == 0) return Element.NULL;
    raf.seek(position);
    return new Element(position, raf.readInt());
    }
     
    /** Atomically initializes a new file. */
    private static void initialize(File file) throws IOException {
    // Use a temp file so we don't leave a partially-initialized file.
    File tempFile = new File(file.getPath() + ".tmp");
    RandomAccessFile raf = open(tempFile);
    try {
    raf.setLength(INITIAL_LENGTH);
    raf.seek(0);
    byte[] headerBuffer = new byte[16];
    writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0);
    raf.write(headerBuffer);
    } finally {
    raf.close();
    }
     
    // A rename is atomic.
    if (!tempFile.renameTo(file)) throw new IOException("Rename failed!");
    }
     
    /**
      * Opens a random access file that writes synchronously.
      */
    private static RandomAccessFile open(File file) throws FileNotFoundException {
    return new RandomAccessFile(file, "rwd");
    }
     
    /**
      * Wraps the position if it exceeds the end of the file.
      */
    private int wrapPosition(int position) {
    return position < fileLength ? position
    : HEADER_LENGTH + position - fileLength;
    }
     
    /**
      * Writes count bytes from buffer to position in file. Automatically wraps
      * write if position is past the end of the file or if buffer overlaps it.
      *
      * @param position in file to write to
      * @param buffer to write from
      * @param count # of bytes to write
      */
    private void ringWrite(int position, byte[] buffer, int offset, int count)
    throws IOException {
    position = wrapPosition(position);
    if (position + count <= fileLength) {
    raf.seek(position);
    raf.write(buffer, offset, count);
    } else {
    // The write overlaps the EOF.
    // # of bytes to write before the EOF.
    int beforeEof = fileLength - position;
    raf.seek(position);
    raf.write(buffer, offset, beforeEof);
    raf.seek(HEADER_LENGTH);
    raf.write(buffer, offset + beforeEof, count - beforeEof);
    }
    }
     
    /**
      * Reads count bytes into buffer from file. Wraps if necessary.
      *
      * @param position in file to read from
      * @param buffer to read into
      * @param count # of bytes to read
      */
    private void ringRead(int position, byte[] buffer, int offset, int count)
    throws IOException {
    position = wrapPosition(position);
    if (position + count <= fileLength) {
    raf.seek(position);
    raf.readFully(buffer, 0, count);
    } else {
    // The read overlaps the EOF.
    // # of bytes to read before the EOF.
    int beforeEof = fileLength - position;
    raf.seek(position);
    raf.readFully(buffer, offset, beforeEof);
    raf.seek(HEADER_LENGTH);
    raf.readFully(buffer, offset + beforeEof, count - beforeEof);
    }
    }
     
    /**
      * Adds an element to the end of the queue.
      *
      * @param data to copy bytes from
      */
    public void add(byte[] data) throws IOException {
    add(data, 0, data.length);
    }
     
    /**
      * Adds an element to the end of the queue.
      *
      * @param data to copy bytes from
      * @param offset to start from in buffer
      * @param count number of bytes to copy
      *
      * @throws IndexOutOfBoundsException if {@code offset < 0} or
      * {@code count < 0}, or if {@code offset + count} is bigger than the length
      * of {@code buffer}.
      */
    public synchronized void add(byte[] data, int offset, int count)
    throws IOException {
    Objects.nonNull(data, "buffer");
    if ((offset | count) < 0 || count > data.length - offset) {
    throw new IndexOutOfBoundsException();
    }
     
    expandIfNecessary(count);
     
    // Insert a new element after the current last element.
    boolean wasEmpty = isEmpty();
    int position = wasEmpty ? HEADER_LENGTH : wrapPosition(
    last.position + Element.HEADER_LENGTH + last.length);
    Element newLast = new Element(position, count);
     
    // Write length.
    writeInt(buffer, 0, count);
    ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);
     
    // Write data.
    ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);
     
    // Commit the addition. If wasEmpty, first == last.
    int firstPosition = wasEmpty ? newLast.position : first.position;
    writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);
    last = newLast;
    elementCount++;
    if (wasEmpty) first = last; // first element
    }
     
    /**
      * Returns the number of used bytes.
      */
    private int usedBytes() {
    if (elementCount == 0) return HEADER_LENGTH;
     
    if (last.position >= first.position) {
    // Contiguous queue.
    return (last.position - first.position) // all but last entry
    + Element.HEADER_LENGTH + last.length // last entry
    + HEADER_LENGTH;
    } else {
    // tail < head. The queue wraps.
    return last.position // buffer front + header
    + Element.HEADER_LENGTH + last.length // last entry
    + fileLength - first.position; // buffer end
    }
    }
     
    /**
      * Returns number of unused bytes.
      */
    private int remainingBytes() {
    return fileLength - usedBytes();
    }
     
    /**
      * Returns true if this queue contains no entries.
      */
    public synchronized boolean isEmpty() {
    return elementCount == 0;
    }
     
    /**
      * If necessary, expands the file to accommodate an additional element of the
      * given length.
      *
      * @param dataLength length of data being added
      */
    private void expandIfNecessary(int dataLength) throws IOException {
    int elementLength = Element.HEADER_LENGTH + dataLength;
    int remainingBytes = remainingBytes();
    if (remainingBytes >= elementLength) return;
     
    // Expand.
    int previousLength = fileLength;
    int newLength;
    // Double the length until we can fit the new data.
    do {
    remainingBytes += previousLength;
    newLength = previousLength << 1;
    previousLength = newLength;
    } while (remainingBytes < elementLength);
    raf.setLength(newLength);
     
    // If the buffer is split, we need to make it contiguous.
    if (last.position < first.position) {
    FileChannel channel = raf.getChannel();
    channel.position(fileLength); // destination position
    int count = last.position + Element.HEADER_LENGTH + last.length
    - HEADER_LENGTH;
    if (channel.transferTo(HEADER_LENGTH, count, channel) != count) {
    throw new AssertionError("Copied insufficient number of bytes!");
    }
     
    // Commit the expansion.
    int newLastPosition = fileLength + last.position - HEADER_LENGTH;
    writeHeader(newLength, elementCount, first.position, newLastPosition);
    last = new Element(newLastPosition, last.length);
    } else {
    writeHeader(newLength, elementCount, first.position, last.position);
    }
    fileLength = newLength;
    }
     
    /**
      * Reads the eldest element. Returns null if the queue is empty.
      */
    public synchronized byte[] peek() throws IOException {
    if (isEmpty()) return null;
    int length = first.length;
    byte[] data = new byte[length];
    ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
    return data;
    }
     
    /**
      * Invokes reader with the eldest element, if an element is available.
      */
    public synchronized void peek(ElementReader reader) throws IOException {
    if (elementCount > 0) {
    reader.read(new ElementInputStream(first), first.length);
    }
    }
     
    /**
      * Invokes the given reader once for each element in the queue, from
      * eldest to most recently added.
      */
    public synchronized void forEach(ElementReader reader) throws IOException {
    int position = first.position;
    for (int i = 0; i < elementCount; i++) {
    Element current = readElement(position);
    reader.read(new ElementInputStream(current), current.length);
    position = wrapPosition(current.position + Element.HEADER_LENGTH
    + current.length);
    }
    }
     
    /**
      * Reads a single element.
      */
    private class ElementInputStream extends InputStream {
    private int position;
    private int remaining;
    private ElementInputStream(Element element) {
    position = wrapPosition(element.position + Element.HEADER_LENGTH);
    remaining = element.length;
    }
    @Override public int read(byte[] buffer, int offset, int length)
    throws IOException {
    Objects.nonNull(buffer, "buffer");
    if ((offset | length) < 0 || length > buffer.length - offset) {
    throw new ArrayIndexOutOfBoundsException();
    }
    if (length > remaining) length = remaining;
    ringRead(position, buffer, offset, length);
    position = wrapPosition(position + length);
    remaining -= length;
    return length;
    }
    @Override public int read() throws IOException {
    if (remaining == 0) return -1;
    raf.seek(position);
    int b = raf.read();
    position = wrapPosition(position + 1);
    remaining--;
    return b;
    }
    }
     
    /**
      * Returns the number of elements in this queue.
      */
    public synchronized int size() {
    return elementCount;
    }
     
    /**
      * Removes the eldest element.
      *
      * @throw NoSuchElementException if the queue is empty
      */
    public synchronized void remove() throws IOException {
    if (isEmpty()) throw new NoSuchElementException();
    if (elementCount == 1) {
    clear();
    } else {
    // assert elementCount > 1
    int newFirstPosition = wrapPosition(first.position
    + Element.HEADER_LENGTH + first.length);
    ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
    int length = readInt(buffer, 0);
    writeHeader(fileLength, elementCount - 1, newFirstPosition, last.position);
    elementCount--;
    first = new Element(newFirstPosition, length);
    }
    }
     
    /**
      * Clears this queue. Truncates the file to the initial size.
      */
    public synchronized void clear() throws IOException {
    if (fileLength > INITIAL_LENGTH) raf.setLength(INITIAL_LENGTH);
    writeHeader(INITIAL_LENGTH, 0, 0, 0);
    elementCount = 0;
    first = last = Element.NULL;
    fileLength = INITIAL_LENGTH;
    }
     
    /**
      * Closes the underlying file.
      */
    public synchronized void close() throws IOException {
    raf.close();
    }
     
    @Override public String toString() {
    final StringBuilder builder = new StringBuilder();
    builder.append(getClass().getSimpleName()).append('[');
    builder.append("fileLength=").append(fileLength);
    builder.append(", size=").append(elementCount);
    builder.append(", first=").append(first);
    builder.append(", last=").append(last);
    builder.append(", element lengths=[");
    try {
    forEach(new ElementReader() {
    boolean first = true;
    public void read(InputStream in, int length) throws IOException {
    if (first) {
    first = false;
    } else {
    builder.append(", ");
    }
    builder.append(length);
    }
    });
    } catch (IOException e) {
    Square.warning(e);
    }
    builder.append("]]");
    return builder.toString();
    }
     
    /** A pointer to an element. */
    static class Element {
     
    /** Length of element header in bytes. */
    static final int HEADER_LENGTH = 4;
     
    /** Null element. */
    static final Element NULL = new Element(0, 0);
     
    /** Position in file. */
    final int position;
     
    /** The length of the data. */
    final int length;
     
    /**
      * Constructs a new element.
      *
      * @param position within file
      * @param length of data
      */
    Element(int position, int length) {
    this.position = position;
    this.length = length;
    }
     
    @Override public String toString() {
    return getClass().getSimpleName() + "["
    + "position = " + position
    + ", length = " + length + "]";
    }
    }
     
    /**
      * Reads queue elements. Enables partial reads as opposed to reading all
      * of the bytes into a byte[].
      */
    public interface ElementReader {
     
    /*
      * TODO: Support remove() call from read().
      */
     
    /**
      * Called once per element.
      *
      * @param in stream of element data. Reads as many bytes as requested,
      * unless fewer than the request number of bytes remains, in which case it
      * reads all the remaining bytes.
      * @param length of element data in bytes
      */
    public void read(InputStream in, int length) throws IOException;
    }
    }
     
    QueueFileTest.java:
     
    package com.squareup.util;
     
    import android.test.AndroidTestCase;
    import com.squareup.Square;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.RandomAccessFile;
    import java.util.Arrays;
    import java.util.LinkedList;
    import java.util.Queue;
    import junit.framework.ComparisonFailure;
     
    /**
     * Tests for QueueFile.
     *
     * @author Bob Lee (bob@squareup.com)
     */
    public class QueueFileTest extends AndroidTestCase {
     
    /**
      * Takes up 33401 bytes in the queue (N*(N+1)/2+4*N). Picked 254 instead of
      * 255 so that the number of bytes isn't a multiple of 4.
      */
    private static int N = 254; //
    private static byte[][] values = new byte[N][];
    static {
    for (int i = 0; i < N; i++) {
    byte[] value = new byte[i];
    // Example: values[3] = { 3, 2, 1 }
    for (int ii = 0; ii < i; ii++) value[ii] = (byte) (i - ii);
    values[i] = value;
    }
    }
     
    private File file;
     
    @Override protected void setUp() throws Exception {
    file = getContext().getFileStreamPath("test.queue");
    file.delete();
    }
     
    @Override protected void tearDown() throws Exception {
    file.delete();
    }
     
    public void testAddOneElement() throws IOException {
    // This test ensures that we update 'first' correctly.
    QueueFile queue = new QueueFile(file);
    byte[] expected = values[253];
    queue.add(expected);
    assertEquals(expected, queue.peek());
    queue.close();
    queue = new QueueFile(file);
    assertEquals(expected, queue.peek());
    }
     
    public void testAddAndRemoveElements() throws IOException {
    long start = System.nanoTime();
     
    Queue<byte[]> expected = new LinkedList<byte[]>();
     
    for (int round = 0; round < 5; round++) {
    QueueFile queue = new QueueFile(file);
    for (int i = 0; i < N; i++) {
    queue.add(values[i]);
    expected.add(values[i]);
    }
     
    // Leave N elements in round N, 15 total for 5 rounds. Removing all the
    // elements would be like starting with an empty queue.
    for (int i = 0; i < N - round - 1; i++) {
    assertEquals(expected.remove(), queue.peek());
    queue.remove();
    }
    queue.close();
    }
     
    // Remove and validate remaining 15 elements.
    QueueFile queue = new QueueFile(file);
    assertEquals(15, queue.size());
    assertEquals(expected.size(), queue.size());
    while (!expected.isEmpty()) {
    assertEquals(expected.remove(), queue.peek());
    queue.remove();
    }
    queue.close();
     
    // length() returns 0, but I checked the size w/ 'ls', and it is correct.
    // assertEquals(65536, file.length());
     
    Square.debug("Ran in " + ((System.nanoTime() - start) / 1000000) + "ms.");
    }
     
    /**
      * Tests queue expansion when the data crosses EOF.
      */
    public void testSplitExpansion() throws IOException {
    // This should result in 3560 bytes.
    int max = 80;
     
    Queue<byte[]> expected = new LinkedList<byte[]>();
    QueueFile queue = new QueueFile(file);
     
    for (int i = 0; i < max; i++) {
    expected.add(values[i]);
    queue.add(values[i]);
    }
     
    // Remove all but 1.
    for (int i = 1; i < max; i++) {
    assertEquals(expected.remove(), queue.peek());
    queue.remove();
    }
     
    // This should wrap around before expanding.
    for (int i = 0; i < N; i++) {
    expected.add(values[i]);
    queue.add(values[i]);
    }
     
    while (!expected.isEmpty()) {
    assertEquals(expected.remove(), queue.peek());
    queue.remove();
    }
     
    queue.close();
    }
     
    public void testFailedAdd() throws IOException {
    QueueFile queueFile = new QueueFile(file);
    queueFile.add(values[253]);
    queueFile.close();
     
    final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
    queueFile = new QueueFile(braf);
     
    try {
    queueFile.add(values[252]);
    fail();
    } catch (IOException e) { /* expected */ }
     
    braf.rejectCommit = false;
     
    // Allow a subsequent add to succeed.
    queueFile.add(values[251]);
     
    queueFile.close();
     
    queueFile = new QueueFile(file);
    assertEquals(2, queueFile.size());
    assertEquals(values[253], queueFile.peek());
    queueFile.remove();
    assertEquals(values[251], queueFile.peek());
    }
     
    public void testFailedRemoval() throws IOException {
    QueueFile queueFile = new QueueFile(file);
    queueFile.add(values[253]);
    queueFile.close();
     
    final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
    queueFile = new QueueFile(braf);
     
    try {
    queueFile.remove();
    fail();
    } catch (IOException e) { /* expected */ }
     
    queueFile.close();
     
    queueFile = new QueueFile(file);
    assertEquals(1, queueFile.size());
    assertEquals(values[253], queueFile.peek());
     
    queueFile.add(values[99]);
    queueFile.remove();
    assertEquals(values[99], queueFile.peek());
    }
     
    public void testFailedExpansion() throws IOException {
    QueueFile queueFile = new QueueFile(file);
    queueFile.add(values[253]);
    queueFile.close();
     
    final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
    queueFile = new QueueFile(braf);
     
    try {
    // This should trigger an expansion which should fail.
    queueFile.add(new byte[8000]);
    fail();
    } catch (IOException e) { /* expected */ }
     
    queueFile.close();
     
    queueFile = new QueueFile(file);
     
    assertEquals(1, queueFile.size());
    assertEquals(values[253], queueFile.peek());
    assertEquals(4096, queueFile.fileLength);
     
    queueFile.add(values[99]);
    queueFile.remove();
    assertEquals(values[99], queueFile.peek());
    }
     
    public void testPeakWithElementReader() throws IOException {
    QueueFile queueFile = new QueueFile(file);
    final byte[] a = { 1, 2 };
    queueFile.add(a);
    final byte[] b = { 3, 4, 5 };
    queueFile.add(b);
     
    queueFile.peek(new QueueFile.ElementReader() {
    public void read(InputStream in, int length) throws IOException {
    assertEquals(length, 2);
    byte[] actual = new byte[length];
    in.read(actual);
    assertEquals(a, actual);
    }
    });
     
    queueFile.peek(new QueueFile.ElementReader() {
    public void read(InputStream in, int length) throws IOException {
    assertEquals(length, 2);
    assertEquals(1, in.read());
    assertEquals(2, in.read());
    assertEquals(-1, in.read());
    }
    });
     
    queueFile.remove();
     
    queueFile.peek(new QueueFile.ElementReader() {
    public void read(InputStream in, int length) throws IOException {
    assertEquals(length, 3);
    byte[] actual = new byte[length];
    in.read(actual);
    assertEquals(b, actual);
    }
    });
     
    assertEquals(b, queueFile.peek());
    assertEquals(1, queueFile.size());
    }
     
    public void testForEach() throws IOException {
    QueueFile queueFile = new QueueFile(file);
     
    final byte[] a = { 1, 2 };
    queueFile.add(a);
    final byte[] b = { 3, 4, 5 };
    queueFile.add(b);
     
    final int[] iteration = new int[] { 0 };
    QueueFile.ElementReader elementReader = new QueueFile.ElementReader() {
    public void read(InputStream in, int length) throws IOException {
    if (iteration[0] == 0) {
    assertEquals(length, 2);
    byte[] actual = new byte[length];
    in.read(actual);
    assertEquals(a, actual);
    } else if (iteration[0] == 1) {
    assertEquals(length, 3);
    byte[] actual = new byte[length];
    in.read(actual);
    assertEquals(b, actual);
    } else {
    fail();
    }
    iteration[0]++;
    }
    };
     
    queueFile.forEach(elementReader);
     
    assertEquals(a, queueFile.peek());
    assertEquals(2, iteration[0]);
    }
     
    /**
      * Compares two byte[]s for equality.
      */
    private static void assertEquals(byte[] expected, byte[] actual) {
    if (!Arrays.equals(expected, actual)) {
    throw new ComparisonFailure(null, Arrays.toString(expected),
    Arrays.toString(actual));
    }
    }
     
    /**
      * A RandomAccessFile that can break when you go to write the COMMITTED
      * status.
      */
    static class BrokenRandomAccessFile extends RandomAccessFile {
    boolean rejectCommit = true;
    BrokenRandomAccessFile(File file, String mode)
    throws FileNotFoundException {
    super(file, mode);
    }
    @Override public void write(byte[] buffer) throws IOException {
    if (rejectCommit && getFilePointer() == 0) {
    throw new IOException("No commit for you!");
    }
    super.write(buffer);
    }
    }
    }
     

用户评论