最全Flume、ElasticSearch、Kibana实现日志实时展示
阅读原文时间:2021年04月21日阅读:1

今天一天的时间,成功使用flume把日志扇入ElasticSearch中,并运行Kibana实现日志的初步展示,记录于此。

1:ES集群的搭建不予赘述,可参考:如何搭建ES集群

2:Flume与ES协同

这一部分堪称是重中之重,主要的时间就是花费在这上面了。

flume的sink里,其实是有ElasticSearchSink的,我的打算,也是想直接使用其实现功能即可,后发现,ES使用的版本过高,但又不想照网上那样说的恢复到以前的1.x版本,于是,自己想办法把flume内的flume-ng-elasticsearch-sink的jar包修改后重新打了一份,成功运行起来。

废话少说,进入正题。

JDK版本:1.8.0_111

Flume版本:1.6.0;很多人可能疑惑如何找到flume的版本,可以参照lib目录下那些jar包,很容易看到自己的flume版本的。

ElasticSearch:5.6.2。

下面是详细步骤:

1:flume-ng-elasticsearch-sink-1.6.0.jar

这个包,就在flume下的lib内,负责的就是ElasticSearchSink的功能,从官网上找到对应的flume版本,将源代码全部下载下来。

我是从github上下载的,参照我的版本,找到了flume-1.6对应的branch,全部代码拷贝下来之后,作为maven工程导入到开发工具,我用的是Eclipse。

导入之后,目录结构应该大致如此,修改build-path为1.8即可。

导入完毕之后,不会主动报错,但是我们必须按照自己的配置来,让它报错,从报错的地方一点点去改。

首先,修改pom.xml文件。

<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
    license agreements. See the NOTICE file distributed with this work for additional 
    information regarding copyright ownership. The ASF licenses this file to 
    You under the Apache License, Version 2.0 (the "License"); you may not use 
    this file except in compliance with the License. You may obtain a copy of 
    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
    by applicable law or agreed to in writing, software distributed under the 
    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
    OF ANY KIND, either express or implied. See the License for the specific 
    language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>flume-ng-sinks</artifactId>
        <groupId>org.apache.flume</groupId>
        <version>1.6.0</version>
    </parent>
    <groupId>org.apache.flume.flume-ng-sinks</groupId>
    <artifactId>flume-ng-elasticsearch-sink</artifactId>
    <name>Flume NG ElasticSearch Sink</name>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.rat</groupId>
                <artifactId>apache-rat-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
        <!-- 这里,我把原先的option改为了我需要的5.6.2版本 -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>5.6.2</version><!--$NO-MVN-MAN-VER$ -->
        </dependency>
        <!-- 这个包是必须要添加的,不然也会报错 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>5.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- 这个包是为了完成自己的用途加的,用不到可以不加 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>
    </dependencies>
</project>

里面我改动过的地方,都有注释;因为我需要连接的ES是5.6.2版本的,所以把对应的牵涉到ES的版本,全部更换为了5.6.2。

2:修改之后,大片大片的开始报错了,这里,test报的错不用管它,打包时候skip tests即可。

下面的步骤顺序不是固定的,但是所有步骤进行下来,最终应该是全正确的。

1:ContentBuilderUtil类会报错:不太记得改动的地方了,直接附上源码

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
 * Utility methods for using ElasticSearch {@link XContentBuilder}
 */
public class ContentBuilderUtil {

    private static final Charset charset = Charset.defaultCharset();

    private ContentBuilderUtil() {
    }

    public static void appendField(XContentBuilder builder, String field,
            byte[] data) throws IOException {
        XContentType contentType = XContentFactory.xContentType(data);
        if (contentType == null) {
            addSimpleField(builder, field, data);
        } else {
            addComplexField(builder, field, contentType, data);
        }
    }

    public static void addSimpleField(XContentBuilder builder,
            String fieldName, byte[] data) throws IOException {
        builder.field(fieldName, new String(data, charset));
    }

    public static void addComplexField(XContentBuilder builder,
            String fieldName, XContentType contentType, byte[] data)
            throws IOException {
        XContentParser parser = XContentFactory.xContent(contentType)
                .createParser(NamedXContentRegistry.EMPTY, data);
        parser.nextToken();
        // Add the field name, but not the value.
        builder.field(fieldName);
        try {
            // This will add the whole parsed content as the value of the field.
            builder.copyCurrentStructure(parser);
        } catch (Exception ex) {
            // If we get an exception here the most likely cause is nested JSON
            // that
            // can't be figured out in the body. At this point just push it
            // through
            // as is, we have already added the field so don't do it again
            builder.endObject();
            builder.field(fieldName, new String(data, charset));
        } finally {
            if (parser != null) {
                parser.close();
            }
        }
    }
}

2:ElasticSearchEventSerializer会报错

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;

import java.io.IOException;

/**
 * Interface for an event serializer which serializes the headers and body of an
 * event to write them to ElasticSearch. This is configurable, so any config
 * params required should be taken through this.
 */
public interface ElasticSearchEventSerializer extends Configurable,
        ConfigurableComponent {

    public static final Charset charset = Charset.defaultCharset();

    /**
     * Return an {@link BytesStream} made up of the serialized flume event
     * 
     * @param event
     *            The flume event to serialize
     * @return A {@link BytesStream} used to write to ElasticSearch
     * @throws IOException
     *             If an error occurs during serialization
     */
    abstract XContentBuilder getContentBuilder(Event event) throws IOException;
}

3:ElasticSearchLogStashEventSerializer

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
 * Serialize flume events into the same format LogStash uses</p>
 *
 * This can be used to send events to ElasticSearch and use clients such as
 * Kabana which expect Logstash formated indexes
 *
 * <pre>
 * {
 *    "@timestamp": "2010-12-21T21:48:33.309258Z",
 *    "@tags": [ "array", "of", "tags" ],
 *    "@type": "string",
 *    "@source": "source of the event, usually a URL."
 *    "@source_host": ""
 *    "@source_path": ""
 *    "@fields":{
 *       # a set of fields for this event
 *       "user": "jordan",
 *       "command": "shutdown -r":
 *     }
 *     "@message": "the original plain-text message"
 *   }
 * </pre>
 *
 * If the following headers are present, they will map to the above logstash
 * output as long as the logstash fields are not already present.</p>
 *
 * <pre>
 *  timestamp: long -> @timestamp:Date
 *  host: String -> @source_host: String
 *  src_path: String -> @source_path: String
 *  type: String -> @type: String
 *  source: String -> @source: String
 * </pre>
 *
 * @see https
 *      ://github.com/logstash/logstash/wiki/logstash%27s-internal-message-
 *      format
 */
public class ElasticSearchLogStashEventSerializer implements
        ElasticSearchEventSerializer {

    @Override
    public XContentBuilder getContentBuilder(Event event) throws IOException {
        XContentBuilder builder = jsonBuilder().startObject();
        appendBody(builder, event);
        appendHeaders(builder, event);
        return builder;
    }

    private void appendBody(XContentBuilder builder, Event event)
            throws IOException, UnsupportedEncodingException {
        byte[] body = event.getBody();
        ContentBuilderUtil.appendField(builder, "@message", body);
    }

    private void appendHeaders(XContentBuilder builder, Event event)
            throws IOException {
        Map<String, String> headers = new HashMap<String, String>(
                event.getHeaders());

        String timestamp = headers.get("timestamp");
        if (!StringUtils.isBlank(timestamp)
                && StringUtils.isBlank(headers.get("@timestamp"))) {
            long timestampMs = Long.parseLong(timestamp);
            builder.field("@timestamp", new Date(timestampMs));
        }

        String source = headers.get("source");
        if (!StringUtils.isBlank(source)
                && StringUtils.isBlank(headers.get("@source"))) {
            ContentBuilderUtil.appendField(builder, "@source",
                    source.getBytes(charset));
        }

        String type = headers.get("type");
        if (!StringUtils.isBlank(type)
                && StringUtils.isBlank(headers.get("@type"))) {
            ContentBuilderUtil.appendField(builder, "@type",
                    type.getBytes(charset));
        }

        String host = headers.get("host");
        if (!StringUtils.isBlank(host)
                && StringUtils.isBlank(headers.get("@source_host"))) {
            ContentBuilderUtil.appendField(builder, "@source_host",
                    host.getBytes(charset));
        }

        String srcPath = headers.get("src_path");
        if (!StringUtils.isBlank(srcPath)
                && StringUtils.isBlank(headers.get("@source_path"))) {
            ContentBuilderUtil.appendField(builder, "@source_path",
                    srcPath.getBytes(charset));
        }

        builder.startObject("@fields");
        for (String key : headers.keySet()) {
            byte[] val = headers.get(key).getBytes(charset);
            ContentBuilderUtil.appendField(builder, key, val);
        }
        builder.endObject();
    }

    @Override
    public void configure(Context context) {
        // NO-OP...
    }

    @Override
    public void configure(ComponentConfiguration conf) {
        // NO-OP...
    }
}

4:ElasticSearchTransportClient

 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch.client;

import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

public class ElasticSearchTransportClient implements ElasticSearchClient {

    public static final Logger logger = LoggerFactory
            .getLogger(ElasticSearchTransportClient.class);

    private InetSocketTransportAddress[] serverAddresses;
    private ElasticSearchEventSerializer serializer;
    private ElasticSearchIndexRequestBuilderFactory indexRequestBuilderFactory;
    private BulkRequestBuilder bulkRequestBuilder;

    private Client client;

    @VisibleForTesting
    InetSocketTransportAddress[] getServerAddresses() {
        return serverAddresses;
    }

    @VisibleForTesting
    void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
        this.bulkRequestBuilder = bulkRequestBuilder;
    }

    /**
     * Transport client for external cluster
     * 
     * @param hostNames
     * @param clusterName
     * @param serializer
     */
    public ElasticSearchTransportClient(String[] hostNames, String clusterName,
            ElasticSearchEventSerializer serializer) {
        configureHostnames(hostNames);
        this.serializer = serializer;
        openClient(clusterName);
    }

    public ElasticSearchTransportClient(String[] hostNames, String clusterName,
            ElasticSearchIndexRequestBuilderFactory indexBuilder) {
        configureHostnames(hostNames);
        this.indexRequestBuilderFactory = indexBuilder;
        openClient(clusterName);
    }

    /**
     * Local transport client only for testing
     * 
     * @param indexBuilderFactory
     */
    // public ElasticSearchTransportClient(
    // ElasticSearchIndexRequestBuilderFactory indexBuilderFactory) {
    // this.indexRequestBuilderFactory = indexBuilderFactory;
    // openLocalDiscoveryClient();
    // }

    /**
     * Local transport client only for testing
     *
     * @param serializer
     */
    // public ElasticSearchTransportClient(ElasticSearchEventSerializer
    // serializer) {
    // this.serializer = serializer;
    // openLocalDiscoveryClient();
    // }

    /**
     * Used for testing
     *
     * @param client
     *            ElasticSearch Client
     * @param serializer
     *            Event Serializer
     */
    public ElasticSearchTransportClient(Client client,
            ElasticSearchEventSerializer serializer) {
        this.client = client;
        this.serializer = serializer;
    }

    /**
     * Used for testing
     *
     * @param client
     *            ElasticSearch Client
     * @param serializer
     *            Event Serializer
     */
    public ElasticSearchTransportClient(Client client,
            ElasticSearchIndexRequestBuilderFactory requestBuilderFactory)
            throws IOException {
        this.client = client;
        requestBuilderFactory.createIndexRequest(client, null, null, null);
    }

    private void configureHostnames(String[] hostNames) {
        logger.warn(Arrays.toString(hostNames));
        serverAddresses = new InetSocketTransportAddress[hostNames.length];
        for (int i = 0; i < hostNames.length; i++) {
            String[] hostPort = hostNames[i].trim().split(":");
            String host = hostPort[0].trim();
            int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1]
                    .trim()) : DEFAULT_PORT;
            // 此处加以修改了
            try {
                serverAddresses[i] = new InetSocketTransportAddress(
                        InetAddress.getByName(host), port);
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void close() {
        if (client != null) {
            client.close();
        }
        client = null;
    }

    /**
     * 
     * @description:将输出的异常转换为字符串
     * @author:yuzhao.yang
     * @param:
     * @return:
     * @time:2017年6月7日 上午10:27:00
     */
    public String transfer(Exception e) throws Exception {
        // e.printStackTrace();
        ByteArrayOutputStream buf = new ByteArrayOutputStream();
        e.printStackTrace(new PrintWriter(buf, true));
        String expMessage = buf.toString();
        buf.close();
        if (null != expMessage) {
            return expMessage;
        } else {
            return null;
        }
    }

    @Override
    public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
            String indexType, long ttlMs) throws Exception {
        if (bulkRequestBuilder == null) {
            bulkRequestBuilder = client.prepareBulk();
        }

        IndexRequestBuilder indexRequestBuilder = null;
        if (indexRequestBuilderFactory == null) {
            Map<String, ?> map = null;
            try {
                String body = new String(event.getBody());
                logger.error("数据结果:" + body);
                map = (Map<String, ?>) JSON.parse(body);
            } catch (Exception e) {
                logger.error("getContentBuilder异常:" + transfer(e));
            }

            indexRequestBuilder = client.prepareIndex(
                    indexNameBuilder.getIndexName(event), indexType).setSource(
                    map);
        } else {
            indexRequestBuilder = indexRequestBuilderFactory
                    .createIndexRequest(client,
                            indexNameBuilder.getIndexPrefix(event), indexType,
                            event);
        }

        if (ttlMs > 0) {
            indexRequestBuilder.setTTL(ttlMs);
        }
        bulkRequestBuilder.add(indexRequestBuilder);
    }

    @Override
    public void execute() throws Exception {
        try {
            BulkResponse bulkResponse = bulkRequestBuilder.execute()
                    .actionGet();
            if (bulkResponse.hasFailures()) {
                throw new EventDeliveryException(
                        bulkResponse.buildFailureMessage());
            }
        } finally {
            bulkRequestBuilder = client.prepareBulk();
        }
    }

    /**
     * Open client to elaticsearch cluster
     * 
     * @param clusterName
     */
    private void openClient(String clusterName) {
        Settings settings = Settings.builder().put("cluster.name", clusterName)
                .build();

        // TransportClient transportClient = new TransportClient(settings);
        // for (InetSocketTransportAddress host : serverAddresses) {
        // transportClient.addTransportAddress(host);
        // }

        TransportClient transportClient = null;
        for (InetSocketTransportAddress host : serverAddresses) {
            if (null == transportClient) {
                transportClient = new PreBuiltTransportClient(settings)
                        .addTransportAddress(host);
            } else {
                transportClient = transportClient.addTransportAddress(host);
            }
        }
        if (client != null) {
            client.close();
        }
        client = transportClient;
    }

    /*
     * FOR TESTING ONLY...
     * 
     * Opens a local discovery node for talking to an elasticsearch server
     * running in the same JVM
     */
    // private void openLocalDiscoveryClient() {
    // logger.info("Using ElasticSearch AutoDiscovery mode");
    // Node node = NodeBuilder.nodeBuilder().client(true).local(true).node();
    // if (client != null) {
    // client.close();
    // }
    // client = node.client();
    // }

    @Override
    public void configure(Context context) {
        // To change body of implemented methods use File | Settings | File
        // Templates.
    }
}

这个类,我主要是在addEvent内部做出了一点修改,因为直接使用XContentBuilder总是报错,于是使用了map格式来进行数据转换。

其他基本没什么需要修改得了。

可能有些修改不太记得了,这里,我把修改后的代码放在了github上:修改后的代码地址

这里需要注意:我的数据是map格式的,所以针对transportclient做出了如上图的修改,其他数据格式不同的,可以思考一下自己的实现方式。

代码修改完成之后,直接打包,虽然pom.xml没有打包插件,但是按照默认的打包逻辑进行就可以了,会自动下载maven插件进行打包的。

打包后体积很小,因为很多jar包并没有打入进去,所以,接下来我们还要针对flume下的jar包进行修改和替换,这里,直接附上我的flume下的所有jar包记录,方便大家使用:

apache-log4j-extras-1.1.jar            flume-ng-kafka-sink-1.6.0.jar             kite-data-core-1.0.0.jar             parquet-avro-1.4.1.jar
async-1.4.0.jar                        flume-ng-log4jappender-1.6.0.jar          kite-data-hbase-1.0.0.jar            parquet-column-1.4.1.jar
asynchbase-1.5.0.jar                   flume-ng-morphline-solr-sink-1.6.0.jar    kite-data-hive-1.0.0.jar             parquet-common-1.4.1.jar
avro-1.7.4.jar                         flume-ng-node-1.6.0.jar                   kite-hadoop-compatibility-1.0.0.jar  parquet-encoding-1.4.1.jar
avro-ipc-1.7.4.jar                     flume-ng-sdk-1.6.0.jar                    lang-mustache-client-5.6.2.jar       parquet-format-2.0.0.jar
commons-cli-1.2.jar                    flume-scribe-source-1.6.0.jar             libthrift-0.9.0.jar                  parquet-generator-1.4.1.jar
commons-codec-1.8.jar                  flume-spillable-memory-channel-1.6.0.jar  log4j-1.2.17.jar                     parquet-hadoop-1.4.1.jar
commons-collections-3.2.1.jar          flume-thrift-source-1.6.0.jar             log4j-api-2.9.1.jar                  parquet-hive-bundle-1.4.1.jar
commons-compress-1.4.1.jar             flume-tools-1.6.0.jar                     lucene-analyzers-common-6.6.1.jar    parquet-jackson-1.4.1.jar
commons-dbcp-1.4.jar                   flume-twitter-source-1.6.0.jar            lucene-backward-codecs-6.6.1.jar     percolator-client-5.6.2.jar
commons-io-2.1.jar                     gson-2.2.2.jar                            lucene-core-6.6.1.jar                plugin-cli-5.6.2.jar
commons-jexl-2.1.1.jar                 guava-11.0.2.jar                          lucene-grouping-6.6.1.jar            protobuf-java-2.5.0.jar
commons-lang-2.5.jar                   HdrHistogram-2.1.9.jar                    lucene-highlighter-6.6.1.jar         reindex-client-5.6.2.jar
commons-logging-1.1.1.jar              hppc-0.7.1.jar                            lucene-join-6.6.1.jar                scala-library-2.10.1.jar
commons-pool-1.5.4.jar                 httpclient-4.2.1.jar                      lucene-memory-6.6.1.jar              securesm-1.1.jar
curator-client-2.6.0.jar               httpcore-4.1.3.jar                        lucene-misc-6.6.1.jar                serializer-2.7.2.jar
curator-framework-2.6.0.jar            irclib-1.10.jar                           lucene-queries-6.6.1.jar             servlet-api-2.5-20110124.jar
curator-recipes-2.6.0.jar              jackson-annotations-2.3.0.jar             lucene-queryparser-6.6.1.jar         slf4j-api-1.6.1.jar
derby-10.8.2.2.jar                     jackson-core-2.8.6.jar                    lucene-sandbox-6.6.1.jar             slf4j-log4j12-1.6.1.jar
elasticsearch-5.6.2.jar                jackson-core-asl-1.9.3.jar                lucene-spatial3d-6.6.1.jar           snakeyaml-1.15.jar
flume-avro-source-1.6.0.jar            jackson-databind-2.3.1.jar                lucene-spatial-6.6.1.jar             snappy-java-1.1.0.jar
flume-dataset-sink-1.6.0.jar           jackson-dataformat-cbor-2.8.6.jar         lucene-spatial-extras-6.6.1.jar      spatial4j-0.6.jar
flume-file-channel-1.6.0.jar           jackson-dataformat-smile-2.8.6.jar        lucene-suggest-6.6.1.jar             t-digest-3.0.jar
flume-hdfs-sink-1.6.0.jar              jackson-dataformat-yaml-2.8.6.jar         mapdb-0.9.9.jar                      transport-5.6.2.jar
flume-hive-sink-1.6.0.jar              jackson-mapper-asl-1.9.3.jar              metrics-core-2.2.0.jar               transport-netty3-client-5.6.2.jar
flume-irc-sink-1.6.0.jar               java-version-checker-5.6.2.jar            mina-core-2.0.4.jar                  transport-netty4-client-5.6.2.jar
flume-jdbc-channel-1.6.0.jar           jetty-6.1.26.jar                          netty-3.5.12.Final.jar               twitter4j-core-3.0.3.jar
flume-jms-source-1.6.0.jar             jetty-util-6.1.26.jar                     netty-buffer-4.1.13.Final.jar        twitter4j-media-support-3.0.3.jar
flume-kafka-channel-1.6.0.jar          jna-4.4.0-1.jar                           netty-codec-4.1.13.Final.jar         twitter4j-stream-3.0.3.jar
flume-kafka-source-1.6.0.jar           joda-time-2.1.jar                         netty-common-4.1.13.Final.jar        velocity-1.7.jar
flume-ng-auth-1.6.0.jar                joda-time-2.9.5.jar                       netty-handler-4.1.13.Final.jar       xalan-2.7.2.jar
flume-ng-configuration-1.6.0.jar       jopt-simple-3.2.jar                       netty-resolver-4.1.13.Final.jar      xercesImpl-2.9.1.jar
flume-ng-core-1.6.0.jar                jopt-simple-5.0.2.jar                     netty-transport-4.1.13.Final.jar     xml-apis-1.3.04.jar
flume-ng-elasticsearch-sink-1.6.0.jar  jsr305-1.3.9.jar                          opencsv-2.3.jar                      xz-1.0.jar
flume-ng-embedded-agent-1.6.0.jar      jts-1.13.jar                              paranamer-2.3.jar                    zkclient-0.3.jar
flume-ng-hbase-sink-1.6.0.jar          kafka_2.10-0.8.1.1.jar                    parent-join-client-5.6.2.jar

我这里首先把ElasticSearch下lib目录内的所有jar包都放了过来,然后对于自己的项目,把那些不重复的jar包全部抛了过去,最终正常运行起来。

查看日志,发现数据的确成功输送到了ES中,完美解决。

3:Kibana简单使用

我这里,使用的是:kibana-5.6.2-linux-x86_64,大家可以直接在ES官网中找到下载;解压,修改配置文件,这里把配置文件附上:

# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "0.0.0.0"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy. This only affects
# the URLs generated by Kibana, your proxy is expected to remove the basePath value before forwarding requests
# to Kibana. This setting cannot end in a slash.
#server.basePath: ""

# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576

# The Kibana server's name.  This is used for display purposes.
#server.name: "your-hostname"

# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://192.168.100.34:9200"

# When this setting's value is true Kibana uses the hostname specified in the server.host
# setting. When the value of this setting is false, Kibana uses the hostname of the host
# that connects to this Kibana instance.
#elasticsearch.preserveHost: true

# Kibana uses an index in Elasticsearch to store saved searches, visualizations and
# dashboards. Kibana creates a new index if the index doesn't already exist.
kibana.index: ".kibana"

# The default application to load.
#kibana.defaultAppId: "discover"

# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
#elasticsearch.username: "user"
#elasticsearch.password: "pass"

# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.
# These settings enable SSL for outgoing requests from the Kibana server to the browser.
#server.ssl.enabled: false
#server.ssl.certificate: /path/to/your/server.crt
#server.ssl.key: /path/to/your/server.key

# Optional settings that provide the paths to the PEM-format SSL certificate and key files.
# These files validate that your Elasticsearch backend uses the same key files.
#elasticsearch.ssl.certificate: /path/to/your/client.crt
#elasticsearch.ssl.key: /path/to/your/client.key

# Optional setting that enables you to specify a path to the PEM file for the certificate
# authority for your Elasticsearch instance.
#elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ]

# To disregard the validity of SSL certificates, change this setting's value to 'none'.
#elasticsearch.ssl.verificationMode: full

# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of
# the elasticsearch.requestTimeout setting.
#elasticsearch.pingTimeout: 1500

# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value
# must be a positive integer.
#elasticsearch.requestTimeout: 30000

# List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side
# headers, set this value to [] (an empty list).
#elasticsearch.requestHeadersWhitelist: [ authorization ]

# Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten
# by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.
#elasticsearch.customHeaders: {}

# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.
#elasticsearch.shardTimeout: 0

# Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying.
#elasticsearch.startupTimeout: 5000

# Specifies the path where Kibana creates the process ID file.
#pid.file: /var/run/kibana.pid

# Enables you specify a file where Kibana stores log output.
#logging.dest: stdout

# Set the value of this setting to true to suppress all logging output.
#logging.silent: false

# Set the value of this setting to true to suppress all logging output other than error messages.
#logging.quiet: false

# Set the value of this setting to true to log all events, including system usage information
# and all requests.
#logging.verbose: false

# Set the interval in milliseconds to sample system and process performance
# metrics. Minimum is 100ms. Defaults to 5000.
#ops.interval: 5000

# The default locale. This locale can be used in certain circumstances to substitute any missing
# translations.
#i18n.defaultLocale: "en"

主要是server.host和elasticsearch.url稍微修改了下,很容易看懂。

启动,成功。

浏览器通过:hostIp:5601。

恭喜你,看到了Kibana的启动界面,至于kibana的使用,那就是另一篇文章的事情了。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章