Toto je (hodně) staré vlákno, ale nedávno jsem narazil na podobný problém. Ve skutečnosti jsem potřeboval klonování stdin na stdout s kopií do roury, která není blokující. navrhovaný ftee v první odpovědi tam opravdu pomohl, ale byl (pro můj případ použití) příliš nestálý. To znamená, že jsem ztratil data, která jsem mohl zpracovat, kdybych se k nim dostal včas.
Scénář, kterému jsem čelil, je, že mám proces (some_process), který agreguje některá data a zapisuje své výsledky každé tři sekundy do stdout. (zjednodušené) nastavení vypadalo takto (ve skutečném nastavení používám pojmenované potrubí):
some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz
Nyní musí být raw_data.gz zkomprimován a musí být kompletní. ftee dělá tuto práci velmi dobře. Ale trubka, kterou používám uprostřed, byla příliš pomalá na to, aby popadla vyprázdněná data - ale byla dostatečně rychlá na to, aby zpracovala vše, pokud se k ní dalo dostat, což bylo testováno s normálním odpalištěm. Normální odpaliště však blokuje, pokud se s nejmenovanou trubkou něco stane, a protože chci mít možnost se na požádání připojit, odpaliště nepřipadá v úvahu. Zpět k tématu:Zlepšilo se to, když jsem mezi to vložil vyrovnávací paměť, výsledkem bylo:
some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz
Ale stále to ztrácelo data, která jsem mohl zpracovat. Takže jsem pokračoval a rozšířil ftee navržené dříve na verzi s vyrovnávací pamětí (bftee). Stále má všechny stejné vlastnosti, ale používá (neefektivní?) vnitřní vyrovnávací paměť pro případ, že selže zápis. Stále ztrácí data, pokud je vyrovnávací paměť plná, ale v mém případě to funguje krásně. Jako vždy je zde mnoho prostoru pro zlepšení, ale když jsem odtud zkopíroval kód, rád bych jej sdílel zpět s lidmi, kteří by ho mohli využít.
/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe
(c) [email protected]
(c) [email protected]
WTFPL Licence */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
// the number of sBuffers that are being held at a maximum
#define BUFFER_SIZE 4096
#define BLOCK_SIZE 2048
typedef struct {
char data[BLOCK_SIZE];
int bytes;
} sBuffer;
typedef struct {
sBuffer *data; //array of buffers
int bufferSize; // number of buffer in data
int start; // index of the current start buffer
int end; // index of the current end buffer
int active; // number of active buffer (currently in use)
int maxUse; // maximum number of buffers ever used
int drops; // number of discarded buffer due to overflow
int sWrites; // number of buffer written to stdout
int pWrites; // number of buffers written to pipe
} sQueue;
void InitQueue(sQueue*, int); // initialized the Queue
void PushToQueue(sQueue*, sBuffer*, int); // pushes a buffer into Queue at the end
sBuffer *RetrieveFromQueue(sQueue*); // returns the first entry of the buffer and removes it or NULL is buffer is empty
sBuffer *PeakAtQueue(sQueue*); // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer
void ShrinkInQueue(sQueue *queue, int); // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty
void DelFromQueue(sQueue *queue); // removes the first entry of the queue
static void sigUSR1(int); // signal handled for SUGUSR1 - used for stats output to stderr
static void sigINT(int); // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ?
sQueue queue; // Buffer storing the overflow
volatile int quit; // for quiting the main loop
int main(int argc, char *argv[])
{
int readfd, writefd;
struct stat status;
char *fifonam;
sBuffer buffer;
ssize_t bytes;
int bufferSize = BUFFER_SIZE;
signal(SIGPIPE, SIG_IGN);
signal(SIGUSR1, sigUSR1);
signal(SIGTERM, sigINT);
signal(SIGINT, sigINT);
/** Handle commandline args and open the pipe for non blocking writing **/
if(argc < 2 || argc > 3)
{
printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
"FIFO - path to a named pipe, required argument\n"
"BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);
exit(EXIT_FAILURE);
}
fifonam = argv[1];
if (argc == 3) {
bufferSize = atoi(argv[2]);
if (bufferSize == 0) bufferSize = BUFFER_SIZE;
}
readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
if(-1==readfd)
{
perror("bftee: readfd: open()");
exit(EXIT_FAILURE);
}
if(-1==fstat(readfd, &status))
{
perror("bftee: fstat");
close(readfd);
exit(EXIT_FAILURE);
}
if(!S_ISFIFO(status.st_mode))
{
printf("bftee: %s in not a fifo!\n", fifonam);
close(readfd);
exit(EXIT_FAILURE);
}
writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
if(-1==writefd)
{
perror("bftee: writefd: open()");
close(readfd);
exit(EXIT_FAILURE);
}
close(readfd);
InitQueue(&queue, bufferSize);
quit = 0;
while(!quit)
{
// read from STDIN
bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data));
// if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading
if (bytes < 0 && errno == EINTR) continue;
if (bytes <= 0) break;
// save the number if read bytes in the current buffer to be processed
buffer.bytes = bytes;
// this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux
// thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less.
bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes);
queue.sWrites++;
if(-1==bytes) {
perror("ftee: writing to stdout");
break;
}
sBuffer *tmpBuffer = NULL;
// if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write
// the buffered data to the pipe. This continues until the Buffer is empty or the write fails.
// NOTE: bytes cannot be -1 (that would have failed just before) when the loop is entered.
while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
// write the oldest buffer to the pipe
bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
// the written bytes are equal to the buffer size, the write is successful - remove the buffer and continue
if (bytes == tmpBuffer->bytes) {
DelFromQueue(&queue);
queue.pWrites++;
} else if (bytes > 0) {
// on a positive bytes value there was a partial write. we shrink the current buffer
// and handle this as a write failure
ShrinkInQueue(&queue, bytes);
bytes = -1;
}
}
// There are several cases here:
// 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe
// 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data
// 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped.
if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes);
// again, there are several cases what can happen here
// 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens
// 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue
// 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue
if (bytes != buffer.bytes)
PushToQueue(&queue, &buffer, bytes);
else
queue.pWrites++;
}
// once we are done with STDIN, try to flush the buffer to the named pipe
if (queue.active > 0) {
//set output buffer to block - here we wait until we can write everything to the named pipe
// --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell.
int saved_flags = fcntl(writefd, F_GETFL);
int new_flags = saved_flags & ~O_NONBLOCK;
int res = fcntl(writefd, F_SETFL, new_flags);
sBuffer *tmpBuffer = NULL;
//TODO: this does not handle partial writes yet
while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
if (bytes != -1) DelFromQueue(&queue);
}
}
close(writefd);
}
/** init a given Queue **/
void InitQueue (sQueue *queue, int bufferSize) {
queue->data = calloc(bufferSize, sizeof(sBuffer));
queue->bufferSize = bufferSize;
queue->start = 0;
queue->end = 0;
queue->active = 0;
queue->maxUse = 0;
queue->drops = 0;
queue->sWrites = 0;
queue->pWrites = 0;
}
/** push a buffer into the Queue**/
void PushToQueue(sQueue *queue, sBuffer *p, int offset)
{
if (offset < 0) offset = 0; // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead
if (offset == p->bytes) return; // in this case there are 0 bytes to add to the queue. Nothing to write
// this should never happen - offset cannot be bigger than the buffer itself. Panic action
if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}
// debug output on a partial write. TODO: remove this line
// if (offset > 0 ) fprintf(stderr, "partial write to buffer\n");
// copy the data from the buffer into the queue and remember its size
memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
queue->data[queue->end].bytes = p->bytes - offset;
// move the buffer forward
queue->end = (queue->end + 1) % queue->bufferSize;
// there is still space in the buffer
if (queue->active < queue->bufferSize)
{
queue->active++;
if (queue->active > queue->maxUse) queue->maxUse = queue->active;
} else {
// Overwriting the oldest. Move start to next-oldest
queue->start = (queue->start + 1) % queue->bufferSize;
queue->drops++;
}
}
/** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/
sBuffer *RetrieveFromQueue(sQueue *queue)
{
if (!queue->active) { return NULL; }
queue->start = (queue->start + 1) % queue->bufferSize;
queue->active--;
return &(queue->data[queue->start]);
}
/** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/
sBuffer *PeakAtQueue(sQueue *queue)
{
if (!queue->active) { return NULL; }
return &(queue->data[queue->start]);
}
/*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/
void ShrinkInQueue(sQueue *queue, int bytes) {
// cannot remove negative amount of bytes - this is an error case. Ignore it
if (bytes <= 0) return;
// remove the entry if the offset is equal to the buffer size
if (queue->data[queue->start].bytes == bytes) {
DelFromQueue(queue);
return;
};
// this is a partial delete
if (queue->data[queue->start].bytes > bytes) {
//shift the memory by the offset
memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;
return;
}
// panic is the are to remove more than we have the buffer
if (queue->data[queue->start].bytes < bytes) {
perror("we wrote more than we had - this should never happen\n");
exit(EXIT_FAILURE);
return;
}
}
/** delete the oldest entry from the queue. Do nothing if the Queue is empty **/
void DelFromQueue(sQueue *queue)
{
if (queue->active > 0) {
queue->start = (queue->start + 1) % queue->bufferSize;
queue->active--;
}
}
/** Stats output on SIGUSR1 **/
static void sigUSR1(int signo) {
fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);
}
/** handle signal for terminating **/
static void sigINT(int signo) {
quit++;
if (quit > 1) exit(EXIT_FAILURE);
}
Tato verze má ještě jeden (nepovinný) argument, který určuje počet bloků, které mají být ukládány do vyrovnávací paměti pro roura. Můj ukázkový hovor nyní vypadá takto:
some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz
Výsledkem je 16 384 bloků, které mají být uloženy do vyrovnávací paměti, než dojde k vyřazení. to spotřebuje asi o 32 Mbyte více paměti, ale... koho to zajímá?
Samozřejmě v reálném prostředí používám pojmenovanou trubku, abych ji mohl podle potřeby připojit a odpojit. Vypadá to takto:
mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | onlineAnalysis.pl > results
Proces také reaguje na signály následovně:SIGUSR1 -> vytiskne čítače na STDERRSIGTERM, SIGINT -> nejprve opustí hlavní smyčku a vyprázdní vyrovnávací paměť do roury, druhý okamžitě ukončí program.
Možná to někomu v budoucnu pomůže... Užijte si to
Inspirován vaší otázkou jsem napsal jednoduchý program, který vám to umožní:
$ myprogram 2>&1 | ftee /tmp/mylog
Chová se podobně jako tee
ale klonuje stdin do stdout a do pojmenovaného roura (prozatím požadavek) bez blokování. To znamená, že pokud se chcete přihlásit tímto způsobem, může se stát, že ztratíte data protokolu, ale myslím, že je to ve vašem scénáři přijatelné.Trik je v zablokování SIGPIPE
signál a ignorovat chybu při zápisu do nefunkčního fifo. Tento vzorek může být samozřejmě optimalizován různými způsoby, ale zatím to podle mě funguje.
/* ftee - clone stdin to stdout and to a named pipe
(c) [email protected]
WTFPL Licence */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
int main(int argc, char *argv[])
{
int readfd, writefd;
struct stat status;
char *fifonam;
char buffer[BUFSIZ];
ssize_t bytes;
signal(SIGPIPE, SIG_IGN);
if(2!=argc)
{
printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a"
" named pipe, required argument\n", argv[0]);
exit(EXIT_FAILURE);
}
fifonam = argv[1];
readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
if(-1==readfd)
{
perror("ftee: readfd: open()");
exit(EXIT_FAILURE);
}
if(-1==fstat(readfd, &status))
{
perror("ftee: fstat");
close(readfd);
exit(EXIT_FAILURE);
}
if(!S_ISFIFO(status.st_mode))
{
printf("ftee: %s in not a fifo!\n", fifonam);
close(readfd);
exit(EXIT_FAILURE);
}
writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
if(-1==writefd)
{
perror("ftee: writefd: open()");
close(readfd);
exit(EXIT_FAILURE);
}
close(readfd);
while(1)
{
bytes = read(STDIN_FILENO, buffer, sizeof(buffer));
if (bytes < 0 && errno == EINTR)
continue;
if (bytes <= 0)
break;
bytes = write(STDOUT_FILENO, buffer, bytes);
if(-1==bytes)
perror("ftee: writing to stdout");
bytes = write(writefd, buffer, bytes);
if(-1==bytes);//Ignoring the errors
}
close(writefd);
return(0);
}
Můžete jej zkompilovat pomocí tohoto standardního příkazu:
$ gcc ftee.c -o ftee
Můžete to rychle ověřit spuštěním např.:
$ ping www.google.com | ftee /tmp/mylog
$ cat /tmp/mylog
Všimněte si také - nejedná se o žádný multiplexer. Pouze jeden proces může provádět $ cat /tmp/mylog
najednou.
Vypadá to jako bash <>
Operátor přesměrování (3.6.10 Opening File Descriptors for Reading and WritingSee) umožňuje neblokování zápisu do souboru/fifo otevřeného s ním.To by mělo fungovat:
$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend
Řešení nabízí gniourf_gniourf na #bash IRC kanálu.
To by však vytvořilo stále rostoucí soubor protokolu, i když by se nepoužíval, dokud na disku nedojde místo.
Proč pravidelně nestřídat protokoly? Existuje dokonce program, který to udělá za vás logrotate
.
Existuje také systém pro generování zpráv protokolu a provádění různých věcí podle typu. Jmenuje se syslog
.
Můžete dokonce kombinovat obojí. Nechte svůj program generovat zprávy syslog, nakonfigurujte syslog tak, aby je umístil do souboru, a použijte logrotate, abyste zajistili, že nezaplní disk.
Pokud se ukázalo, že píšete pro malý vestavěný systém a výstup programu je těžký, můžete zvážit různé techniky.
- Vzdálený syslog:posílejte zprávy syslog na server syslog v síti.
- Použijte úrovně závažnosti dostupné v syslog k provádění různých věcí se zprávami. Např. zahoďte "INFO", ale zaznamenejte a přepošlete "ERR" nebo vyšší. Např. do konzoly
- Použijte ve svém programu obslužný program signálu k opětovnému načtení konfigurace na HUP a tímto způsobem změňte generování protokolu "na vyžádání".
- Nechte svůj program naslouchat na unixovém soketu a po otevření si jej zapište. Tímto způsobem můžete dokonce implementovat interaktivní konzolu do svého programu.
- Pomocí konfiguračního souboru poskytněte podrobné řízení výstupu protokolování.