`
jgsj
  • 浏览: 962699 次
文章分类
社区版块
存档分类
最新评论

双队列的一种实现

 
阅读更多

介绍

双队列是一种高效的内存数据结构,在多线程编程中,能保证生产者线程的写入和消费者的读出尽量做到最低的影响,避免了共享队列的锁开销。本文将介绍一种双队列的设计,并给出实现代码,然后会举例使用的场景。该双队列在项目中使用,性能也得到了验证。

设计

接下来具体介绍双队列的设计,并且会粘贴少量方法代码,帮助介绍。
本文中讲述的双队列,本质上是两个数组保存写入的Object,一个数组负责写入,另一个被消费者读出,两个数组都对应一个重入锁。数组内写入的数据会被计数。
public class DoubleCachedQueue<T> extends AbstractQueue<T> implements
		BlockingQueue<T>, java.io.Serializable {
	private static final long serialVersionUID = 1L;
	
	private static int default_line_limit = 1000;
	
	private static long max_cache_size = 67108864L;

	private int lineLimit;
	
	private long cacheSize;

	private T[] itemsA;

	private T[] itemsB;

	private ReentrantLock readLock, writeLock;

	private Condition notFull;

	private Condition awake;

	/**
	 * writeArray : in reader's eyes, reader get data from data source and write
	 * data to this line array. readArray : in writer's eyes, writer put data to
	 * data destination from this line array.
	 * 
	 * Because of this is doubleQueue mechanism, the two line will exchange when
	 * time is suitable.
	 * 
	 */
	private T[] writeArray, readArray;

	private volatile int writeCount, readCount;

	private int writeArrayTP, readArrayHP;
	
	private volatile boolean closed = false;

	private int spillSize = 0;

	private long lineRx = 0;

	private long lineTx = 0;

队列实现了阻塞队列的接口,所以在向队列offer数据的时候是阻塞的,同样,取出操作poll也会阻塞。两个数组会在适当的时候进行queueSwitch操作。queueSwitch的条件就是当读者把queue读空了之后,且写入的queue此时不为空的时候,两个queue就会进行交换。在交换的时候,写入queue会被上锁,此时生产者不能让队列里写入数据。一般情况下,queue互换其实就是两个数组的引用互换,将相应的计数器也重置,写队列的计数器此时就清零了,因为queue交换是因为读队列已经被读空。
private long queueSwitch(long timeout, boolean isInfinite)
			throws InterruptedException {
		System.out.println("queue switch");
		writeLock.lock();
		try {
			if (writeCount <= 0) {
				if (closed) {
					return -2;
				}
				try {
					if (isInfinite && timeout <= 0) {
						awake.await();
						return -1;
					} else {
						return awake.awaitNanos(timeout);
					}
				} catch (InterruptedException ie) {
					awake.signal();
					throw ie;
				}
			} else {
				T[] tmpArray = readArray;
				readArray = writeArray;
				writeArray = tmpArray;

				readCount = writeCount;
				readArrayHP = 0;

				writeCount = 0;
				writeArrayTP = 0;

				notFull.signal();
				// logger.debug("Queue switch successfully!");
				return -1;
			}
		} finally {
			writeLock.unlock();
		}
	}

上面queue交换的时候,可以看到当要被交换的写队列也已经为空的时候,会做一次检查。如果此时queue已经被显示地关闭了,那么poll操作就会返回空,读者此时应该检查queue是否已经被closed了,若已经closed了,那么读者已经把queue里的数据读完了。这里的显示close是我们给双队列加的一个状态,close这件事的作用是为了让读者知道:生产者已经停止往queue里写新数据了,但是queue里其实可能还有未取完的数据(在写queue里,此时还差一次queue switch),你往queue poll取数据的时候,如果取到空了,那么应该做一次check,如果queue已经关闭了,那么读者就知道本次读的任务完全结束了。反过来,close状态其实不影响写,生产者如果还想写的话,其实也是可以的,但是我不推荐这么做。
public void close() {
		writeLock.lock();
		try {
			closed = true;
			//System.out.println(this);
			awake.signalAll();
		} finally {
			writeLock.unlock();
		}
	}

如果没有这个close标志位的话,可能就需要消费者放入一个EOF让读者知道。这在只有一个生产者和一个消费者的情况下是可行的,但是如果是一个多对一,一对多,甚至多对多的情况呢?一对一的情况是最简单的,也是双队列被创造出来最合适的场景。因为双队列完全分离了一个生产者和一个消费者的锁争抢情况,各自只要获得自己的读/写队列的锁就可以了。在本文阐述的双队列中,唯一产生一些开销的就是queue swtich的情况,如果queue频繁交换的话,还是会产生一些性能开销的。


一对多

上面已经大致介绍了双队列的读写。在实际项目中,一对多的场景需要注意的地方有两:
  • 单个生产者需要在结束的时候关闭queue
  • 多个消费者需要知道任务结束(知道其他线程已经完成任务)
第一点很简单,比如读文件的话,当生产者readLine()最后为空的时候,就认为数据源已经读完,调用方法把queue close()。而消费者在读queue的时候,有时候可能会由于延迟、queue交换等原因取到空数据,此时就如上面一节所说,消费者线程拿到空数据后应该检查queue的状态,如果queue没有关闭,那么应该等待一小会儿后继续poll数据;如果queue关闭了,那么其实说明该线程已经完成了任务。同理,其他消费者线程也应该在取到空的时候做这样的操作。


消费者之间或者外部有一方需要知道各个消费者线程的存活情况,这样才能知道本次任务完成。比如如果外面有一个上帝的话,可以加一个CountDownLatch计数,每个消费者完成后就countDown一次,外部调用await()直到大家都已经退出,那么整个任务结束。如果没有上帝,线程之间互相知道对方情况的话,我的做法是让生产者放入一个EOF,当某线程取到EOF的时候,他知道自己是第一个遇到尽头的人,他会置一个布尔,而其他线程在取到空的时候会检查该布尔值,这样就能知道是否已经有小伙伴已经拿到EOF了,那么这时候就可以countDown了,而拿到EOF的线程进程countDown后就await(),最后退出。
下面是我自己针对这种场景,使用双队列的方式,其中的fromQueue是一个ConcurrentLinkedQueue,大家可以忽略,toQueue是双队列,可以注意一下用法。特别是往里面写的时候,需要while循环重试直到写入成功。
@Override
public void run() {
	long start = System.currentTimeMillis();
	log.debug(Thread.currentThread() + " Unpacker started at " + start);
		
	Random r = new Random(start);
	Bundle bundle = null;
	boolean shoudShutdown = false;
	try {
		while(!shoudShutdown) {
			bundle = (Bundle) fromQueue.poll();
			if (bundle == null) {
				if (seeEOF.get()) {
					// 当取到空,并且其他线程已经取到EOF,那么本线程将Latch减1,并退出循环
					latch.countDown();
					shoudShutdown = true;
				} else {
					// 如果EOF还没被取到,本线程小睡一会后继续取
					try {
						sleep(r.nextInt(10));
					} catch (InterruptedException e) {
						log.error("Interrupted when taking a nap", e);
					}
				}
			} else if (!bundle.isEof()) {
			// bundle非空且非EOF,则往双队列写入一个Bundle
			byte[] lineBytes = BundleUtil.getDecompressedData(bundle);
			// 放入双队列时,若offer失败则重试
			while (!toQueue.offer(new UnCompressedBundle(bundle.getId(), ByteUtil.bytes2Lines(lineBytes, lineDelim), bundle.getIndex(), bundle.getJobId()))) {
				log.info("Unpacker put failed, will retry");
			}
			log.info("After enqueue, queue size is " + toQueue.size());
			} else {
				// Unpacker获得到了EOF
				seeEOF.set(true);
				// 自己将Lacth减1,并等待其他线程退出
				latch.countDown();
				try {
					latch.await();
				} catch (InterruptedException e) {
					log.error("Interrupted when waiting the latch ");
				}
				// 其他线程已经退出,本线程放入EOF
				while (!toQueue.offer(new UnCompressedBundle(-1L, new Line[0], -1L, -1L))) {
					log.info("Unpacker put EOF failed, will retry");
				}
				// 关闭Queue
				toQueue.close();
				// 退出循环
				shoudShutdown = true;
			}
		}
		log.debug(Thread.currentThread() + " Unpacker finished in " + (System.currentTimeMillis()-start) + " ms");
	} catch (Exception e) {
		log.error("Exception when unpacker is running ", e);
		// 将latch减1,表示自己异常退出,且不再工作
		// latch.countDown();
		log.debug(Thread.currentThread() + " Unpacker occured exception and stopped. ");
	} finally {
		
	}
}

多对一

多个生产者的情况下,写入队列无可避免发送锁争抢,但是能保证消费者的稳定读出过程。没有什么特殊处理的地方,这里就不啰嗦了。

总结分析

本文介绍了一种经典双队列的设计和实现,也给出了一些代码演示。文章末尾我会贴出整个双队列的代码实现,需要的同学也可以留言,我把.java发给你。如果使用的时候有发现问题,不吝赐教,这个双队列的实现也还不是很完美。使用的时候也存在需要注意的地方。
其实双队列的目的还是在于让写和读互相没有影响,而且更加照顾了写的速度。因为一般写的速度可能会比较快,而读的人读出之后还会做一些额外的处理,所以写的这一方借助双队列,可以持续写的过程,而且如果读的一方慢的话,可以多起几个消费者线程,就像"一对多"场景里阐述的那样来使用双队列。

下面是整个实现。各位可以仔细看看,发现问题一定记得通知我 :)

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


import lombok.ToString;
import lombok.extern.log4j.Log4j;


/**
 * Represents a region with two swap spaces, one for storing data which from
 * data source, the other one for storing data which will be transferred to data
 * destination.
 * <br>
 * A classical DoubleCachedQueue, In beginning, space A and space B both
 * empty, then loading task begin to load data to space A, when A is almost
 * full, let the data from data source being loaded to space B, then dumping
 * task begin to dump data from space A to data source. When space A is empty,
 * switch the two spaces for load and dump task. Repeat the above operation.
 * 
 */
@Log4j
@ToString
public class DoubleCachedQueue<T> extends AbstractQueue<T> implements
		BlockingQueue<T>, java.io.Serializable {
	private static final long serialVersionUID = 1L;
	
	private static int default_line_limit = 1000;
	
	private static long max_cache_size = 67108864L;

	private int lineLimit;
	
	private long cacheSize;

	private T[] itemsA;

	private T[] itemsB;

	private ReentrantLock readLock, writeLock;

	private Condition notFull;

	private Condition awake;

	/**
	 * writeArray : in reader's eyes, reader get data from data source and write
	 * data to this line array. readArray : in writer's eyes, writer put data to
	 * data destination from this line array.
	 * 
	 * Because of this is doubleQueue mechanism, the two line will exchange when
	 * time is suitable.
	 * 
	 */
	private T[] writeArray, readArray;

	private volatile int writeCount, readCount;

	private int writeArrayTP, readArrayHP;
	
	private volatile boolean closed = false;

	private int spillSize = 0;

	private long lineRx = 0;

	private long lineTx = 0;

	/**
	 * Get info of line number in {@link DoubleCachedQueue} space.
	 * 
	 * @return Information of line number.
	 * 
	 */
	public String info() {
		return String.format("Write Array: %s/%s; Read Array: %s/%s", writeCount, writeArray.length, readCount, readArray.length);
	}

	/**
	 * Use the two parameters to construct a {@link DoubleCachedQueue} which hold the
	 * swap areas.
	 * 
	 * @param lineLimit
	 *            Limit of the line number the {@link DoubleCachedQueue} can hold.
	 * 
	 * @param byteLimit
	 *            Limit of the bytes the {@link DoubleCachedQueue} can hold.
	 * 
	 */
	public DoubleCachedQueue(int lineLimit) {
		if (lineLimit <= 0) {
			this.lineLimit = default_line_limit;
		}else{
			this.lineLimit = lineLimit;
		}
		itemsA = (T[])new Object[lineLimit];
		itemsB = (T[])new Object[lineLimit];

		readLock = new ReentrantLock();
		writeLock = new ReentrantLock();

		notFull = writeLock.newCondition();
		awake = writeLock.newCondition();

		readArray = itemsA;
		writeArray = itemsB;
		spillSize = lineLimit * 8 / 10;
	}
	
	public DoubleCachedQueue(long cacheSize){
		if (cacheSize <= 0) {
			throw new IllegalArgumentException(
					"Queue initial capacity can't less than 0!");
		}
		this.cacheSize = cacheSize > max_cache_size ? max_cache_size : cacheSize;
		
		readLock = new ReentrantLock();
		writeLock = new ReentrantLock();

		notFull = writeLock.newCondition();
		awake = writeLock.newCondition();

		readArray = itemsA;
		writeArray = itemsB;
		spillSize = lineLimit * 8 / 10;
	}

	/**
	 * Get line number of the {@link DoubleCachedQueue}
	 * 
	 * @return lineLimit Limit of the line number the {@link DoubleCachedQueue} can
	 *         hold.
	 * 
	 */
	public int getLineLimit() {
		return lineLimit;
	}

	/**
	 * Set line number of the {@link DoubleCachedQueue}.
	 * 
	 * @param capacity
	 *            Limit of the line number the {@link DoubleCachedQueue} can hold.
	 * 
	 */
	public void setLineLimit(int capacity) {
		this.lineLimit = capacity;
	}

	/**
	 * Insert one line of record to a apace which buffers the swap data.
	 * 
	 * @param line
	 *            The inserted line.
	 * 
	 */
	private void insert(T line) {
		writeArray[writeArrayTP] = line;
		++writeArrayTP;
		++writeCount;
		++lineRx;
	}

	/**
	 * Insert a line array(appointed the limit of array size) of data to a apace
	 * which buffers the swap data.
	 * 
	 * @param lines
	 *            Inserted line array.
	 * 
	 * @param size
	 *            Limit of inserted size of the line array.
	 * 
	 */
	private void insert(T[] lines, int size) {
		if(size > 0){
			System.arraycopy(lines, 0, writeArray, writeArrayTP, size);
			writeArrayTP = writeArrayTP + size;
			writeCount = writeCount + size;
			lineRx = lineRx + size;
		}
//		for (int i = 0; i < size; ++i) {
//			writeArray[writeArrayTP] = lines[i];
//			++writeArrayTP;
//			++writeCount;
//			++lineRx;
//			if(lines[i] != null && lines[i].getLine() != null){
//				byteRx += lines[i].getLine().length();
//			}
//		}
	}

	/**
	 * Extract one line of record from the space which contains current data.
	 * 
	 * @return line A line of data.
	 * 
	 */
	private T extract() {
		T e = readArray[readArrayHP];
		readArray[readArrayHP] = null;
		++readArrayHP;
		--readCount;
		++lineTx;
		return e;
	}

	/**
	 * Extract a line array of data from the space which contains current data.
	 * 
	 * @param ea
	 * @return Extracted line number of data.
	 * 
	 */
	private int extract(T[] ea) {
		int readsize = Math.min(ea.length, readCount);
		if(readsize > 0){
			readCount = readCount - readsize;
			lineTx = lineTx + readsize;
			System.arraycopy(readArray, readArrayHP, ea, 0, readsize);
			readArrayHP = readArrayHP + readsize;
		}
//		for (int i = 0; i < readsize; ++i) {
//			ea[i] = readArray[readArrayHP];
//			readArray[readArrayHP] = null;
//			++readArrayHP;
//			--readCount;
//			++lineTx;
//		}
		return readsize;
	}

	/**
	 * switch condition: read queue is empty && write queue is not empty.
	 * Notice:This function can only be invoked after readLock is grabbed,or may
	 * cause dead lock.
	 * 
	 * @param timeout
	 * 
	 * @param isInfinite
	 *            whether need to wait forever until some other thread awake it.
	 * 
	 * @return
	 * 
	 * @throws InterruptedException
	 * 
	 */

	private long queueSwitch(long timeout, boolean isInfinite)
			throws InterruptedException {
		System.out.println("queue switch");
		writeLock.lock();
		try {
			if (writeCount <= 0) {
				if (closed) {
					return -2;
				}
				try {
					if (isInfinite && timeout <= 0) {
						awake.await();
						return -1;
					} else {
						return awake.awaitNanos(timeout);
					}
				} catch (InterruptedException ie) {
					awake.signal();
					throw ie;
				}
			} else {
				T[] tmpArray = readArray;
				readArray = writeArray;
				writeArray = tmpArray;

				readCount = writeCount;
				readArrayHP = 0;

				writeCount = 0;
				writeArrayTP = 0;

				notFull.signal();
				// logger.debug("Queue switch successfully!");
				return -1;
			}
		} finally {
			writeLock.unlock();
		}
	}

	/**
	 * If exists write space, it will return true, and write one line to the
	 * space. otherwise, it will try to do that in a appointed time,when time is
	 * out if still failed, return false.
	 * 
	 * @param line
	 *            a Line.
	 * 
	 * @param timeout
	 *            appointed limit time
	 * 
	 * @param unit
	 *            time unit
	 * 
	 * @return True if success,False if failed.
	 * 
	 */
	public boolean offer(T line, long timeout, TimeUnit unit)
			throws InterruptedException {
		if (line == null) {
			throw new NullPointerException();
		}
		long nanoTime = unit.toNanos(timeout);
		writeLock.lockInterruptibly();
		if(itemsA == null || itemsB == null){
			initArray(line);
		}
		try {
			for (;;) {
				if (writeCount < writeArray.length) {
					insert(line);
					if (writeCount == 1) {
						awake.signal();
					}
					return true;
				}

				// Time out
				if (nanoTime <= 0) {
					return false;
				}
				// keep waiting
				try {
					nanoTime = notFull.awaitNanos(nanoTime);
				} catch (InterruptedException ie) {
					notFull.signal();
					throw ie;
				}
			}
		} finally {
			writeLock.unlock();
		}
	}

	private void initArray(T line) {
		
		long recordLength = computeSize(line);
		long size = cacheSize/recordLength;
		if(size <= 0){
			size = default_line_limit;
		}
		lineLimit = (int) size;
		itemsA = (T[])new Object[(int) size];
		itemsB = (T[])new Object[(int) size];
		readArray = itemsA;
		writeArray = itemsB;
		
	}

	public long computeSize(T line){
		return 1;
	}

	/**
	 * If exists write space, it will return true, and write a line array to the
	 * space.<br>
	 * otherwise, it will try to do that in a appointed time,when time out if
	 * still failed, return false.
	 * 
	 * @param lines
	 *            line array contains lines of data
	 * 
	 * @param size
	 *            Line number needs to write to the space.
	 * 
	 * @param timeout
	 *            appointed limit time
	 * 
	 * @param unit
	 *            time unit
	 * 
	 * @return status of this operation, true or false.
	 * 
	 * @throws InterruptedException
	 *             if being interrupted during the try limit time.
	 * 
	 */
	public boolean offer(T[] lines, int size, long timeout, TimeUnit unit)
			throws InterruptedException {
		if (lines == null || lines.length == 0) {
			throw new NullPointerException();
		}
		long nanoTime = unit.toNanos(timeout);
		writeLock.lockInterruptibly();
		if(itemsA == null || itemsB == null){
			initArray(lines[0]);
		}
		try {
			for (;;) {
				if (writeCount + size <= writeArray.length) {
					insert(lines, size);
					if (writeCount >= spillSize) {
						awake.signalAll();
					}
					return true;
				}

				// Time out
				if (nanoTime <= 0) {
					return false;
				}
				// keep waiting
				try {
					nanoTime = notFull.awaitNanos(nanoTime);
				} catch (InterruptedException ie) {
					notFull.signal();
					throw ie;
				}
			}
		} finally {
			writeLock.unlock();
		}
	}

	/**
	 * Close the synchronized lock and one inner state.
	 * 
	 */
	public void close() {
		writeLock.lock();
		try {
			closed = true;
			//System.out.println(this);
			awake.signalAll();
		} finally {
			writeLock.unlock();
		}
	}

	public boolean isClosed() {
		return closed;
	}

	/**
	 * 
	 * 
	 * @param timeout
	 *            appointed limit time
	 * 
	 * @param unit
	 *            time unit
	 */
	public T poll(long timeout, TimeUnit unit) throws InterruptedException {
		long nanoTime = unit.toNanos(timeout);
		readLock.lockInterruptibly();

		try {
			for (;;) {
				if (readCount > 0) {
					return extract();
				}

				if (nanoTime <= 0) {
					return null;
				}
				nanoTime = queueSwitch(nanoTime, true);
			}
		} finally {
			readLock.unlock();
		}
	}

	/**
	 * 
	 * @param ea
	 *            line buffer
	 * 
	 * 
	 * @param timeout
	 *            a appointed limit time
	 * 
	 * @param unit
	 *            a time unit
	 * 
	 * @return line number of data.if less or equal than 0, means fail.
	 * 
	 * @throws InterruptedException
	 *             if being interrupted during the try limit time.
	 */
	public int poll(T[] ea, long timeout, TimeUnit unit)
			throws InterruptedException {
		long nanoTime = unit.toNanos(timeout);
		readLock.lockInterruptibly();

		try {
			for (;;) {
				if (readCount > 0) {
					return extract(ea);
				}

				if (nanoTime == -2) {
					return -1;
				}

				if (nanoTime <= 0) {
					return 0;
				}
				nanoTime = queueSwitch(nanoTime, false);
			}
		} finally {
			readLock.unlock();
		}
	}

	public Iterator<T> iterator() {
		return null;
	}

	/**
	 * Get size of {@link Storage} in bytes.
	 * 
	 * @return Storage size.
	 * 
	 * */
	@Override
	public int size() {
		return (writeCount + readCount);
	}

	@Override
	public int drainTo(Collection<? super T> c) {
		return 0;
	}

	@Override
	public int drainTo(Collection<? super T> c, int maxElements) {
		return 0;
	}

	/**
	 * If exists write space, it will return true, and write one line to the
	 * space.<br>
	 * otherwise, it will try to do that in a appointed time(20
	 * milliseconds),when time out if still failed, return false.
	 * 
	 * @param line
	 *            a Line.
	 * 
	 * @see DoubleCachedQueue#offer(Line, long, TimeUnit)
	 * 
	 */
	@Override
	public boolean offer(T line) {
		try {
			return offer(line, 20, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e1) {
			log.debug(e1.getMessage(), e1);
		}
		return false;
	}

	@Override
	public void put(T e) throws InterruptedException {
	}

	@Override
	public int remainingCapacity() {
		return 0;
	}

	@Override
	public T take() throws InterruptedException {
		return null;
	}

	@Override
	public T peek() {
		return null;
	}

	@Override
	public T poll() {
		try {
			return poll(1*1000, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			log.debug(e.getMessage(), e);
		}
		return null;
	}

}


(全文完)
分享到:
评论

相关推荐

    PHP实现双向队列

    php实现的基于数组的双向队列的实现,分为两种实现方式,一种是根据数组的key进行进出队列的操作,一种是使用数组自身的性质进行的操作

    栈和队列的实现2-队列

    实现了两种方式的队列,一种是顺序存储队列的实现,一种是链式存储的队列实现,代码中有针对两种实现方法的测试用例和可执行程序,在linux系统下直接make即可。代码较多,尤其是队列的链式存储方式的实现,建议先...

    php实现的双向队列类实例

    (deque,全名double-ended queue)是一种具有队列和栈的性质的数据结构。双向队列中的元素可以从两端弹出,其限定插入和删除操作在表的两端进行。 在实际使用中,还可以有输出受限的双向队列(即一个端点允许插入和...

    论文研究-双队列控制的激光视觉焊缝跟踪系统研究.pdf

    设计与实现了一套激光视觉引导的焊缝...提出一种双队列控制策略,在采集的焊缝特征点的基础上进一步插值,从而实现激光视觉引导的焊缝自动与平滑跟踪。实验结果表明,该系统具有较好的跟踪精度,能满足工业实际需求。

    基于Linux消息队列的简易聊天室(C语言)(附源代码)

    消息队列是System V支持一种IPC机制,通过类似链表的操作向一个FIFO里通过msgsnd发送用户自定义数据,进程可以通过msgrcv来接收指定类似mtype的数据,从而实现进程间通信。 在服务器端实现广播功能,以及服务器退出...

    C语言使用非循环双向链表实现队列

    我们使用一种比较特殊的链表——非循环双向链表来实现队列。首先这里的说明的是构建的是普通的队列,而不是循环队列。当我们使用数组的时候创建循环队列是为了节省存储空间,而来到链表中时,每一个节点都是动态申请...

    亲测可用基于Linux消息队列的简易聊天室(C语言).zip

    消息队列是System V支持一种IPC机制,通过类似链表的操作向一个FIFO里通过msgsnd发送用户自定义数据,进程可以通过msgrcv来接收指定类似mtype的数据,从而实现进程间通信。 在服务器端实现广播功能,以及服务器退出...

    PHP队列原理及基于队列的写文件案例

    队列是一种线性表,按照先进先出的原则进行的: 入队: 出队: PHP实现队列:第一个元素作为队头,最后一个元素作为队尾 &lt;?php /** * 队列就是这么简单 * * @link */ $array = array('PHP', 'JAVA'); ...

    Python collections.deque双边队列原理详解

    队列是一种只允许在一端进行插入操作,而在另一端进行删除操作的线性表。 在Python文档中搜索队列(queue)会发现,Python标准库中包含了四种队列,分别是queue.Queue / asyncio.Queue / multiprocessing.Queue / ...

    C语言通过使用数据结构来实现双向顺序栈

    双向顺序栈(Double-ended sequential stack)是一种栈数据结构,它允许在两个方向上进行入栈和出栈操作。与常规栈不同,双向顺序栈具有两个栈顶位置,一个在栈的一端,称为"头栈顶"(head),另一个在栈的另一端,...

    操作系统双向进程通信.rar_linux_msgget_操作系统_消息队列 _简单进程双向通信

    1)创建一个消息队列用于收发双方通信包含msgget() msgsnd() msgrcv() 2)编写发送端和接收端代码基于消息队列实现双向通信 3)编译调试顺利运行并提交实验报告 4)课后要求对常见的四种进程通信方式进行调研和总结

    双端口RAM实现两路数据的延时

    代码主要介绍一下使用双端口RAM实现延时的过程。(。。。fifo是 first input first output 的缩写,即先进先出队列,fifo一般用作不同时钟域的缓冲器。...本文中讲述的是同步fifo的一种设计方法。)

    基于条件变量的消息队列

    博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!...

    基于条件变量的消息队列 说明介绍

    博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍...

    一种基于模糊-比例积分双模控制的主动队列管理算法 (2008年)

    通过将模糊推理方法和Pl算法相结合,提出了一种新的主动队列管理(AQM)算法――基于模糊一比例积分(Fuzzy-PI)的双模态控制主动队列管理算法――Fuzzy-PI AQM算法。该算法的基本思想是当偏差很大时,使用模糊逻辑控制...

    Java-学习Map复习总结

    Map是一种以键(key)值(value)对形式保存数据的机制。 1.2 Map双边队列的格式: interface Map { } 1.3 Map双边队列的两大实现类: HashMap: 底层采用的是哈希表的存储机制。 TreeMap: 底层采用的是平衡二叉树的存储...

    基于双向链表的基数排序

    基数排序(radix sort)又称桶排序(bucket sort),相对于常见的比较排序,基数排序是一种分配式排序,需要将关键字拆分成数字位。并且按照数字位的值对数据项进行排序,这种方法不需要进行比较操作。 为了尽可能少的...

    《数据结构》实验

    熟练掌握在两种存储结构上实现栈和队列的基本运算;学会利用栈和队列解决一些实际问题。 串运算的实现 时数 2 性质 验证 内容:1、若X和Y是用结点大小为1的单链表表示的串,设计算法找出X中第一个不在Y中出现的字符...

    双向广度优先搜索(BBFS)路径规划算法(Python实现)

    基于双向广度优先搜索的路径规划算法是一种常用的图搜索算法,用于寻找两个节点之间的最短路径。该算法从起始节点和目标节点同时进行搜索,通过不断扩展搜索范围,直到两个搜索队列相遇或找到最短路径为止。 核心...

    论文研究-高能物理云平台中的弹性计算资源管理机制.pdf

    系统通过高吞吐量计算系统HTCondor运行计算作业,使用开源的云计算平台Openstack管理虚拟计算节点,给出了一种结合虚拟资源配额服务,基于双阈值的弹性资源管理算法,实现资源池整体伸缩,同时设计了二级缓冲池以...

Global site tag (gtag.js) - Google Analytics