Package ch.cern.dirq
Class QueueSimple
- java.lang.Object
-
- ch.cern.dirq.QueueSimple
-
- All Implemented Interfaces:
Queue
,java.lang.Iterable<java.lang.String>
public class QueueSimple extends java.lang.Object implements Queue
QueueSimple - object oriented interface to a simple directory based queue.
A port of Perl module Directory::Queue::Simple http://search.cpan.org/dist/Directory-Queue/
The documentation from Directory::Queue::Simple module has been adapted for Java.
Usage
// sample producer QueueSimple dirq = new QueueSimple("/tmp/test"); for (int i=0; i < 100; i++) { String name = dirq.add("element " + i); System.out.println("# added element " + i + " as " + name); } // sample consumer dirq = QueueSimple("/tmp/test"); for (String name: dirq) { if (! dirq.lock(name)) { continue; } System.out.println("# reading element " + name); String data = dirq.get(name); // one could use dirq.unlock(name) to only browse the queue... dirq.remove(name); }
Description
This module is very similar to the normal directory queue, but uses a different way to store data in the filesystem, using less directories. Its API is almost identical.
Compared to normal directory queue, this module:- is simpler
- is faster
- uses less space on disk
- can be given existing files to store
- does not support schemas
- can only store and retrieve byte strings
- is not compatible (at filesystem level) with the normal directory queue
Directory Structure
The toplevel directory contains intermediate directories that contain the stored elements, each of them in a file.
The names of the intermediate directories are time based: the element insertion time is used to create a 8-digits long hexadecimal number. The granularity (see the constructor) is used to limit the number of new directories. For instance, with a granularity of 60 (the default), new directories will be created at most once per minute.
Since there is usually a filesystem limit in the number of directories a directory can hold, there is a trade-off to be made. If you want to support many added elements per second, you should use a low granularity to keep small directories. However, in this case, you will create many directories and this will limit the total number of elements you can store.
The elements themselves are stored in files (one per element) with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:- SSSSSSSS represents the number of seconds since the Epoch
- MMMMM represents the microsecond part of the time since the Epoch
- R is a random hexadecimal digit used to reduce name collisions
A temporary element (being added to the queue) will have a.tmp
suffix.
A locked element will have a hard link with the same name and the.lck
suffix.
Please refer toQueue
for general information about directory queues.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description private static class
QueueSimple.DotElementFF
FileFilter class to iterate over temporary or locked elements.private static class
QueueSimple.ElementFF
FileFilter class to iterate over (normal) elements.private static class
QueueSimple.IntermediateDirectoryFF
FileFilter class to iterate over intermediate directories.private static class
QueueSimple.QueueSimpleIterator
Iterator for the simple directory queue (private).
-
Field Summary
Fields Modifier and Type Field Description private static int
DEFAULT_GRANULARITY
private static int
DEFAULT_MAXLOCK
private static int
DEFAULT_MAXTEMP
static java.util.regex.Pattern
DIRECTORY_REGEXP
private java.util.Set<java.nio.file.attribute.PosixFilePermission>
directoryPermissions
private static java.io.FileFilter
DOT_ELEMENT_FF
private static java.io.FileFilter
ELEMENT_FF
static java.util.regex.Pattern
ELEMENT_REGEXP
private java.util.Set<java.nio.file.attribute.PosixFilePermission>
filePermissions
private int
granularity
private static java.io.FileFilter
INTERMEDIATE_DIRECTORY_FF
static java.lang.String
LOCKED_SUFFIX
private static org.slf4j.Logger
logger
private static int
MAX_DIRECTORY_UMASK
private static int
MAX_FILE_UMASK
private static long
MAX_MICRO
private static int
MAX_RNDHEX
private static int
MAX_UMASK
private static long
NANO2MICRO
private int
qMaxLock
private int
qMaxTemp
private java.lang.String
queueId
private java.lang.String
queuePath
private static java.util.Random
rand
private int
rndHex
private static long
SECOND
static java.lang.String
TEMPORARY_SUFFIX
private int
umask
-
Constructor Summary
Constructors Constructor Description QueueSimple(java.lang.String path)
Constructor creating a simple directory queue from the given path.QueueSimple(java.lang.String path, int numask)
Constructor creating a simple directory queue from the given path and umask.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description java.lang.String
add(byte[] data)
Add byte array data to the queue.java.lang.String
add(java.lang.String data)
Add String data to the queue.private java.nio.file.Path
addDataHelper(java.lang.String dir, byte[] data)
private java.nio.file.Path
addDataHelper(java.lang.String dir, java.lang.String data)
java.lang.String
addPath(java.lang.String path)
Add the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.private java.lang.String
addPathHelper(java.nio.file.Path tmp, java.lang.String dir)
int
count()
Return the number of elements in the queue.private java.nio.file.Path
createFile(java.lang.String path)
private java.lang.String
directoryName()
private static java.util.Set<java.nio.file.attribute.PosixFilePermission>
directoryPerms(int numask)
private static java.lang.String
elementName(int rnd)
private void
ensureDirectory(java.nio.file.Path path)
private static java.util.Set<java.nio.file.attribute.PosixFilePermission>
filePerms(int numask)
java.lang.String
get(java.lang.String name)
Get the given locked element as String data.byte[]
getAsByteArray(java.lang.String name)
Get the given locked element as byte array data.int
getGranularity()
Get the granularity.java.lang.String
getId()
Return a unique identifier for the queue.int
getMaxLock()
Get the default maxLock for purge().int
getMaxTemp()
Get the default maxTemp for purge().private java.nio.file.Path
getNewPath(java.lang.String dir)
java.lang.String
getPath(java.lang.String name)
Get the path of the given locked element.java.lang.String
getQueuePath()
Return the path of the queue.int
getRndHex()
Get the random hexadecimal digit.int
getUmask()
Get the umask.java.util.Iterator<java.lang.String>
iterator()
Iterator for the simple directory queue.boolean
lock(java.lang.String name)
Lock an element in permissive mode.boolean
lock(java.lang.String name, boolean permissive)
Lock an element.void
purge()
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.void
purge(int maxLock)
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.void
purge(int maxLock, int maxTemp)
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.void
remove(java.lang.String name)
Remove a locked element from the queue.QueueSimple
setGranularity(int value)
Set the granularity.QueueSimple
setMaxLock(int value)
Set the default maxLock for purge().QueueSimple
setMaxTemp(int value)
Set the default maxTemp for purge().QueueSimple
setRndHex(int value)
Set the random hexadecimal digit.QueueSimple
setUmask(int value)
Set the umask.private boolean
touchFile(java.io.File file)
boolean
unlock(java.lang.String name)
Unlock an element in non-permissive mode.boolean
unlock(java.lang.String name, boolean permissive)
Unlock an element.
-
-
-
Field Detail
-
logger
private static final org.slf4j.Logger logger
-
TEMPORARY_SUFFIX
public static final java.lang.String TEMPORARY_SUFFIX
- See Also:
- Constant Field Values
-
LOCKED_SUFFIX
public static final java.lang.String LOCKED_SUFFIX
- See Also:
- Constant Field Values
-
DIRECTORY_REGEXP
public static final java.util.regex.Pattern DIRECTORY_REGEXP
-
ELEMENT_REGEXP
public static final java.util.regex.Pattern ELEMENT_REGEXP
-
DEFAULT_GRANULARITY
private static final int DEFAULT_GRANULARITY
- See Also:
- Constant Field Values
-
DEFAULT_MAXLOCK
private static final int DEFAULT_MAXLOCK
- See Also:
- Constant Field Values
-
DEFAULT_MAXTEMP
private static final int DEFAULT_MAXTEMP
- See Also:
- Constant Field Values
-
MAX_RNDHEX
private static final int MAX_RNDHEX
- See Also:
- Constant Field Values
-
MAX_UMASK
private static final int MAX_UMASK
- See Also:
- Constant Field Values
-
MAX_DIRECTORY_UMASK
private static final int MAX_DIRECTORY_UMASK
- See Also:
- Constant Field Values
-
MAX_FILE_UMASK
private static final int MAX_FILE_UMASK
- See Also:
- Constant Field Values
-
SECOND
private static final long SECOND
- See Also:
- Constant Field Values
-
NANO2MICRO
private static final long NANO2MICRO
- See Also:
- Constant Field Values
-
MAX_MICRO
private static final long MAX_MICRO
- See Also:
- Constant Field Values
-
INTERMEDIATE_DIRECTORY_FF
private static final java.io.FileFilter INTERMEDIATE_DIRECTORY_FF
-
ELEMENT_FF
private static final java.io.FileFilter ELEMENT_FF
-
DOT_ELEMENT_FF
private static final java.io.FileFilter DOT_ELEMENT_FF
-
rand
private static java.util.Random rand
-
granularity
private int granularity
-
qMaxLock
private int qMaxLock
-
qMaxTemp
private int qMaxTemp
-
rndHex
private int rndHex
-
umask
private int umask
-
queueId
private java.lang.String queueId
-
queuePath
private java.lang.String queuePath
-
directoryPermissions
private java.util.Set<java.nio.file.attribute.PosixFilePermission> directoryPermissions
-
filePermissions
private java.util.Set<java.nio.file.attribute.PosixFilePermission> filePermissions
-
-
Constructor Detail
-
QueueSimple
public QueueSimple(java.lang.String path) throws java.io.IOException
Constructor creating a simple directory queue from the given path.- Parameters:
path
- path of the directory queue- Throws:
java.io.IOException
- if any file operation fails
-
QueueSimple
public QueueSimple(java.lang.String path, int numask) throws java.io.IOException
Constructor creating a simple directory queue from the given path and umask.- Parameters:
path
- path of the directory queuenumask
- numerical umask of the directory queue- Throws:
java.io.IOException
- if any file operation fails
-
-
Method Detail
-
getQueuePath
public java.lang.String getQueuePath()
Description copied from interface:Queue
Return the path of the queue.- Specified by:
getQueuePath
in interfaceQueue
- Returns:
- queue path
-
getId
public java.lang.String getId()
Description copied from interface:Queue
Return a unique identifier for the queue.
-
add
public java.lang.String add(java.lang.String data) throws java.io.IOException
Description copied from interface:Queue
Add String data to the queue.
-
add
public java.lang.String add(byte[] data) throws java.io.IOException
Description copied from interface:Queue
Add byte array data to the queue.
-
addPath
public java.lang.String addPath(java.lang.String path) throws java.io.IOException
Description copied from interface:Queue
Add the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.
-
get
public java.lang.String get(java.lang.String name) throws java.io.IOException
Description copied from interface:Queue
Get the given locked element as String data.
-
getAsByteArray
public byte[] getAsByteArray(java.lang.String name) throws java.io.IOException
Description copied from interface:Queue
Get the given locked element as byte array data.- Specified by:
getAsByteArray
in interfaceQueue
- Parameters:
name
- name of the element to be retrieved- Returns:
- data associated with the given element
- Throws:
java.io.IOException
-
getPath
public java.lang.String getPath(java.lang.String name)
Description copied from interface:Queue
Get the path of the given locked element.
This pathFile can be read but not removed, you must use the remove() method for this purpose.
-
lock
public boolean lock(java.lang.String name) throws java.io.IOException
Description copied from interface:Queue
Lock an element in permissive mode.
-
lock
public boolean lock(java.lang.String name, boolean permissive) throws java.io.IOException
Description copied from interface:Queue
Lock an element.
-
unlock
public boolean unlock(java.lang.String name) throws java.io.IOException
Description copied from interface:Queue
Unlock an element in non-permissive mode.
-
unlock
public boolean unlock(java.lang.String name, boolean permissive) throws java.io.IOException
Description copied from interface:Queue
Unlock an element.
-
remove
public void remove(java.lang.String name) throws java.io.IOException
Description copied from interface:Queue
Remove a locked element from the queue.
-
count
public int count()
Description copied from interface:Queue
Return the number of elements in the queue.
Locked elements are counted but temporary elements are not.
-
purge
public void purge() throws java.io.IOException
Description copied from interface:Queue
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.
It uses default value for maxTemp and maxLock
-
purge
public void purge(int maxLock) throws java.io.IOException
Description copied from interface:Queue
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.
-
purge
public void purge(int maxLock, int maxTemp) throws java.io.IOException
Description copied from interface:Queue
Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.- Specified by:
purge
in interfaceQueue
- Parameters:
maxLock
- maximum time for a locked element (in seconds); if set to 0, locked elements will not be unlocked; if set to null, the object's default value will be usedmaxTemp
- maximum time for a temporary element (in seconds); if set to 0, temporary elements will not be removed if set to null, the object's default value will be used- Throws:
java.io.IOException
- if any file operation fails
-
getGranularity
public int getGranularity()
Get the granularity.- Returns:
- granularity (in seconds)
-
setGranularity
public QueueSimple setGranularity(int value)
Set the granularity.- Parameters:
value
- granularity to be set (in seconds)- Returns:
- the object itself
-
getUmask
public int getUmask()
Get the umask.- Returns:
- numerical umask
-
setUmask
public QueueSimple setUmask(int value)
Set the umask.- Parameters:
value
- umask to be set (numerical)- Returns:
- the object itself
-
getMaxLock
public int getMaxLock()
Get the default maxLock for purge().- Returns:
- maximum lock time (in seconds)
-
setMaxLock
public QueueSimple setMaxLock(int value)
Set the default maxLock for purge().- Parameters:
value
- maximum lock time (in seconds)- Returns:
- the object itself
-
getMaxTemp
public int getMaxTemp()
Get the default maxTemp for purge().- Returns:
- maximum temporary time (in seconds)
-
setMaxTemp
public QueueSimple setMaxTemp(int value)
Set the default maxTemp for purge().- Parameters:
value
- maximum temporary time (in seconds)- Returns:
- the object itself
-
getRndHex
public int getRndHex()
Get the random hexadecimal digit.- Returns:
- numerical hexadecimal digit
-
setRndHex
public QueueSimple setRndHex(int value)
Set the random hexadecimal digit.- Parameters:
value
- hexadecimal digit to be set (numerical)- Returns:
- the object itself
-
directoryPerms
private static java.util.Set<java.nio.file.attribute.PosixFilePermission> directoryPerms(int numask)
-
filePerms
private static java.util.Set<java.nio.file.attribute.PosixFilePermission> filePerms(int numask)
-
directoryName
private java.lang.String directoryName()
-
elementName
private static java.lang.String elementName(int rnd)
-
addPathHelper
private java.lang.String addPathHelper(java.nio.file.Path tmp, java.lang.String dir) throws java.io.IOException
- Throws:
java.io.IOException
-
createFile
private java.nio.file.Path createFile(java.lang.String path) throws java.io.IOException
- Throws:
java.io.IOException
-
getNewPath
private java.nio.file.Path getNewPath(java.lang.String dir) throws java.io.IOException
- Throws:
java.io.IOException
-
addDataHelper
private java.nio.file.Path addDataHelper(java.lang.String dir, byte[] data) throws java.io.IOException
- Throws:
java.io.IOException
-
addDataHelper
private java.nio.file.Path addDataHelper(java.lang.String dir, java.lang.String data) throws java.io.IOException
- Throws:
java.io.IOException
-
ensureDirectory
private void ensureDirectory(java.nio.file.Path path) throws java.io.IOException
- Throws:
java.io.IOException
-
touchFile
private boolean touchFile(java.io.File file)
-
iterator
public java.util.Iterator<java.lang.String> iterator()
Iterator for the simple directory queue.- Specified by:
iterator
in interfacejava.lang.Iterable<java.lang.String>
-
-